diff --git a/config/config.go b/config/config.go index 57bf895423..b78636fa80 100644 --- a/config/config.go +++ b/config/config.go @@ -831,6 +831,12 @@ type MempoolConfig struct { // it's insertion time into the mempool is beyond TTLDuration. TTLNumBlocks int64 `mapstructure:"ttl-num-blocks"` + // TxEnqueueTimeout defines how long new mempool transaction will wait when internal + // processing queue is full (most likely due to busy CheckTx execution). + // Once the timeout is reached, the transaction will be silently dropped. + // If set to 0, the timeout is disabled and transactions will wait indefinitely. + TxEnqueueTimeout time.Duration `mapstructure:"tx-enqueue-timeout"` + // Timeout of check TX operations received from other nodes. // Use 0 to disable. TimeoutCheckTx time.Duration `mapstructure:"timeout-check-tx"` @@ -880,6 +886,9 @@ func (cfg *MempoolConfig) ValidateBasic() error { if cfg.TTLNumBlocks < 0 { return errors.New("ttl-num-blocks can't be negative") } + if cfg.TxEnqueueTimeout < 0 { + return errors.New("tx-enqueue-timeout can't be negative") + } if cfg.TimeoutCheckTx < 0 { return errors.New("timeout-check-tx can't be negative") } diff --git a/config/toml.go b/config/toml.go index fe678f1eb0..c05ea6fb0b 100644 --- a/config/toml.go +++ b/config/toml.go @@ -433,6 +433,14 @@ ttl-duration = "{{ .Mempool.TTLDuration }}" # it's insertion time into the mempool is beyond ttl-duration. ttl-num-blocks = {{ .Mempool.TTLNumBlocks }} +# TxEnqueueTimeout defines how many nanoseconds new mempool transaction (received +# from other nodes) will wait when internal processing queue is full +# (most likely due to busy CheckTx execution).Once the timeout is reached, the transaction +# will be silently dropped. +# +# If set to 0, the timeout is disabled and transactions will wait indefinitely. +tx-enqueue-timeout = "{{ .Mempool.TxEnqueueTimeout }}" + # Timeout of check TX operations received from other nodes, using p2p protocol. # Use 0 to disable. timeout-check-tx = "{{ .Mempool.TimeoutCheckTx }}" diff --git a/internal/blocksync/reactor_test.go b/internal/blocksync/reactor_test.go index f29f990b04..21aa67df43 100644 --- a/internal/blocksync/reactor_test.go +++ b/internal/blocksync/reactor_test.go @@ -63,7 +63,7 @@ func setup( rts := &reactorTestSuite{ config: conf, logger: log.NewTestingLogger(t).With("module", "block_sync", "testCase", t.Name()), - network: p2ptest.MakeNetwork(ctx, t, p2ptest.NetworkOptions{Config: conf, NumNodes: numNodes}), + network: p2ptest.MakeNetwork(ctx, t, p2ptest.NetworkOptions{Config: conf, NumNodes: numNodes}, log.NewNopLogger()), nodes: make([]types.NodeID, 0, numNodes), reactors: make(map[types.NodeID]*Reactor, numNodes), app: make(map[types.NodeID]abciclient.Client, numNodes), diff --git a/internal/consensus/reactor_test.go b/internal/consensus/reactor_test.go index b5a0a03a39..07cc40dc37 100644 --- a/internal/consensus/reactor_test.go +++ b/internal/consensus/reactor_test.go @@ -81,7 +81,7 @@ func setup( privProTxHashes[i] = state.privValidator.ProTxHash } rts := &reactorTestSuite{ - network: p2ptest.MakeNetwork(ctx, t, p2ptest.NetworkOptions{NumNodes: numNodes, ProTxHashes: privProTxHashes}), + network: p2ptest.MakeNetwork(ctx, t, p2ptest.NetworkOptions{NumNodes: numNodes, ProTxHashes: privProTxHashes}, log.NewNopLogger()), states: make(map[types.NodeID]*State), reactors: make(map[types.NodeID]*Reactor, numNodes), subs: make(map[types.NodeID]eventbus.Subscription, numNodes), diff --git a/internal/evidence/reactor_test.go b/internal/evidence/reactor_test.go index 5fb858511c..845273d389 100644 --- a/internal/evidence/reactor_test.go +++ b/internal/evidence/reactor_test.go @@ -58,7 +58,7 @@ func setup(ctx context.Context, t *testing.T, stateStores []sm.Store) *reactorTe rts := &reactorTestSuite{ numStateStores: numStateStores, logger: log.NewNopLogger().With("testCase", t.Name()), - network: p2ptest.MakeNetwork(ctx, t, p2ptest.NetworkOptions{NumNodes: numStateStores}), + network: p2ptest.MakeNetwork(ctx, t, p2ptest.NetworkOptions{NumNodes: numStateStores}, log.NewNopLogger()), reactors: make(map[types.NodeID]*evidence.Reactor, numStateStores), pools: make(map[types.NodeID]*evidence.Pool, numStateStores), peerUpdates: make(map[types.NodeID]*p2p.PeerUpdates, numStateStores), diff --git a/internal/mempool/reactor_test.go b/internal/mempool/reactor_test.go index 6ebbdae2ca..d857c68e1e 100644 --- a/internal/mempool/reactor_test.go +++ b/internal/mempool/reactor_test.go @@ -47,7 +47,7 @@ func setupReactors(ctx context.Context, t *testing.T, logger log.Logger, numNode rts := &reactorTestSuite{ logger: log.NewNopLogger().With("testCase", t.Name()), - network: p2ptest.MakeNetwork(ctx, t, p2ptest.NetworkOptions{NumNodes: numNodes}), + network: p2ptest.MakeNetwork(ctx, t, p2ptest.NetworkOptions{NumNodes: numNodes}, log.NewNopLogger()), reactors: make(map[types.NodeID]*Reactor, numNodes), mempools: make(map[types.NodeID]*TxMempool, numNodes), kvstores: make(map[types.NodeID]*kvstore.Application, numNodes), diff --git a/internal/p2p/channel_params.go b/internal/p2p/channel_params.go index 8a0aa8418b..62c168b2a0 100644 --- a/internal/p2p/channel_params.go +++ b/internal/p2p/channel_params.go @@ -68,6 +68,7 @@ func ChannelDescriptors(cfg *config.Config) map[ChannelID]*ChannelDescriptor { RecvMessageCapacity: mempoolBatchSize(cfg.Mempool.MaxTxBytes), RecvBufferCapacity: 128, Name: "mempool", + EnqueueTimeout: cfg.Mempool.TxEnqueueTimeout, }, SnapshotChannel: { ID: SnapshotChannel, diff --git a/internal/p2p/conn/connection.go b/internal/p2p/conn/connection.go index 1ecdd1a4dc..6602781feb 100644 --- a/internal/p2p/conn/connection.go +++ b/internal/p2p/conn/connection.go @@ -612,16 +612,23 @@ type ChannelDescriptor struct { Priority int // TODO: Remove once p2p refactor is complete. - SendQueueCapacity int + SendQueueCapacity int + // RecvMessageCapacity defines the max message size for a given p2p Channel. RecvMessageCapacity int - // RecvBufferCapacity defines the max buffer size of inbound messages for a + // RecvBufferCapacity defines the max number of inbound messages for a // given p2p Channel queue. RecvBufferCapacity int // Human readable name of the channel, used in logging and // diagnostics. Name string + + // Timeout for enqueue operations on the incoming queue. + // When timeout expires, messages will be silently dropped. + // + // If zero, enqueue operations will not time out. + EnqueueTimeout time.Duration } func (chDesc ChannelDescriptor) FillDefaults() (filled ChannelDescriptor) { diff --git a/internal/p2p/p2ptest/network.go b/internal/p2p/p2ptest/network.go index 49d0d74722..e9f6d9abde 100644 --- a/internal/p2p/p2ptest/network.go +++ b/internal/p2p/p2ptest/network.go @@ -54,9 +54,8 @@ func (opts *NetworkOptions) setDefaults() { // MakeNetwork creates a test network with the given number of nodes and // connects them to each other. -func MakeNetwork(ctx context.Context, t *testing.T, opts NetworkOptions) *Network { +func MakeNetwork(ctx context.Context, t *testing.T, opts NetworkOptions, logger log.Logger) *Network { opts.setDefaults() - logger := log.NewNopLogger() network := &Network{ Nodes: map[types.NodeID]*Node{}, logger: logger, diff --git a/internal/p2p/p2ptest/require.go b/internal/p2p/p2ptest/require.go index af49bc18eb..16804ae81a 100644 --- a/internal/p2p/p2ptest/require.go +++ b/internal/p2p/p2ptest/require.go @@ -25,7 +25,8 @@ func RequireEmpty(ctx context.Context, t *testing.T, channels ...p2p.Channel) { count := 0 for iter.Next(ctx) { count++ - require.Nil(t, iter.Envelope()) + e := iter.Envelope() + require.Nil(t, e, "received unexpected message %v", e.Message) } require.Zero(t, count) require.Error(t, ctx.Err()) diff --git a/internal/p2p/pex/reactor_test.go b/internal/p2p/pex/reactor_test.go index b88dc1b6c3..cfd8c25288 100644 --- a/internal/p2p/pex/reactor_test.go +++ b/internal/p2p/pex/reactor_test.go @@ -380,7 +380,7 @@ func setupNetwork(ctx context.Context, t *testing.T, opts testOptions) *reactorT rts := &reactorTestSuite{ logger: log.NewNopLogger().With("testCase", t.Name()), - network: p2ptest.MakeNetwork(ctx, t, networkOpts), + network: p2ptest.MakeNetwork(ctx, t, networkOpts, log.NewNopLogger()), reactors: make(map[types.NodeID]*pex.Reactor, realNodes), pexChannels: make(map[types.NodeID]p2p.Channel, opts.TotalNodes), peerChans: make(map[types.NodeID]chan p2p.PeerUpdate, opts.TotalNodes), diff --git a/internal/p2p/pqueue.go b/internal/p2p/pqueue.go index 350c38ce0b..eea4ea2142 100644 --- a/internal/p2p/pqueue.go +++ b/internal/p2p/pqueue.go @@ -99,13 +99,16 @@ func newPQScheduler( logger log.Logger, m *Metrics, lc *metricsLabelCache, - chDescs []*ChannelDescriptor, + chDescs map[ChannelID]*ChannelDescriptor, enqueueBuf, dequeueBuf, capacity uint, ) *pqScheduler { // copy each ChannelDescriptor and sort them by ascending channel priority - chDescsCopy := make([]*ChannelDescriptor, len(chDescs)) - copy(chDescsCopy, chDescs) + chDescsCopy := make([]*ChannelDescriptor, 0, len(chDescs)) + for _, chDesc := range chDescs { + chDescsCopy = append(chDescsCopy, chDesc) + } + sort.Slice(chDescsCopy, func(i, j int) bool { return chDescsCopy[i].Priority < chDescsCopy[j].Priority }) var ( diff --git a/internal/p2p/pqueue_test.go b/internal/p2p/pqueue_test.go index 3e1594d79c..7058b7e4cc 100644 --- a/internal/p2p/pqueue_test.go +++ b/internal/p2p/pqueue_test.go @@ -14,8 +14,8 @@ type testMessage = gogotypes.StringValue func TestCloseWhileDequeueFull(t *testing.T) { enqueueLength := 5 - chDescs := []*ChannelDescriptor{ - {ID: 0x01, Priority: 1}, + chDescs := map[ChannelID]*ChannelDescriptor{ + 0x01: {ID: 0x01, Priority: 1}, } pqueue := newPQScheduler(log.NewNopLogger(), NopMetrics(), newMetricsLabelCache(), chDescs, uint(enqueueLength), 1, 120) diff --git a/internal/p2p/router.go b/internal/p2p/router.go index 2dac816c60..31830139bd 100644 --- a/internal/p2p/router.go +++ b/internal/p2p/router.go @@ -6,6 +6,7 @@ import ( "fmt" "io" "net" + "reflect" "runtime" "time" @@ -151,7 +152,7 @@ type Router struct { options RouterOptions privKey crypto.PrivKey peerManager *PeerManager - chDescs []*ChannelDescriptor + chDescs map[ChannelID]*ChannelDescriptor transport Transport endpoint *Endpoint connTracker connectionTracker @@ -198,7 +199,7 @@ func NewRouter( options.MaxIncomingConnectionAttempts, options.IncomingConnectionWindow, ), - chDescs: make([]*ChannelDescriptor, 0), + chDescs: make(map[ChannelID]*ChannelDescriptor, 0), transport: transport, endpoint: endpoint, peerManager: peerManager, @@ -256,7 +257,7 @@ func (r *Router) OpenChannel(ctx context.Context, chDesc *ChannelDescriptor) (Ch if _, ok := r.channelQueues[id]; ok { return nil, fmt.Errorf("channel %v already exists", id) } - r.chDescs = append(r.chDescs, chDesc) + r.chDescs[id] = chDesc queue := r.queueFactory(chDesc.RecvBufferCapacity) outCh := make(chan Envelope, chDesc.RecvBufferCapacity) @@ -698,6 +699,11 @@ func (r *Router) routePeer(ctx context.Context, peerID types.NodeID, conn Connec r.metrics.PeersConnected.Add(1) r.peerManager.Ready(ctx, peerID, channels) + // we use context to manage the lifecycle of the peer + // note that original ctx will be used in cleanup + ioCtx, cancel := context.WithCancel(ctx) + defer cancel() + sendQueue := r.getOrMakeQueue(peerID, channels) defer func() { r.peerMtx.Lock() @@ -705,6 +711,7 @@ func (r *Router) routePeer(ctx context.Context, peerID types.NodeID, conn Connec delete(r.peerChannels, peerID) r.peerMtx.Unlock() + _ = conn.Close() sendQueue.close() r.peerManager.Disconnected(ctx, peerID) @@ -714,43 +721,68 @@ func (r *Router) routePeer(ctx context.Context, peerID types.NodeID, conn Connec r.logger.Info("peer connected", "peer", peerID, "endpoint", conn) errCh := make(chan error, 2) + wg := sync.WaitGroup{} + wg.Add(2) go func() { select { - case errCh <- r.receivePeer(ctx, peerID, conn): - case <-ctx.Done(): + case errCh <- r.receivePeer(ioCtx, peerID, conn): + case <-ioCtx.Done(): } + wg.Done() }() go func() { select { - case errCh <- r.sendPeer(ctx, peerID, conn, sendQueue): - case <-ctx.Done(): + case errCh <- r.sendPeer(ioCtx, peerID, conn, sendQueue): + case <-ioCtx.Done(): } + wg.Done() }() - var err error + // wait for error from first goroutine + + var ( + err error + ctxErr error + ) + select { case err = <-errCh: - case <-ctx.Done(): + r.logger.Debug("routePeer: received error from subroutine 1", "peer", peerID, "err", err) + case <-ioCtx.Done(): + r.logger.Debug("routePeer: ctx done", "peer", peerID) + ctxErr = ioCtx.Err() } + // goroutine 1 has finished, so we can cancel the context and close everything + cancel() _ = conn.Close() sendQueue.close() - select { - case <-ctx.Done(): - case e := <-errCh: - // The first err was nil, so we update it with the second err, which may - // or may not be nil. - if err == nil { - err = e + r.logger.Debug("routePeer: closed conn and send queue, waiting for all goroutines to finish", "peer", peerID, "err", err) + wg.Wait() + r.logger.Debug("routePeer: all goroutines finished", "peer", peerID, "err", err) + + // Drain the error channel; these should typically not be interesting +FOR: + for { + select { + case e := <-errCh: + r.logger.Debug("routePeer: received error when draining errCh", "peer", peerID, "err", e) + // if we received non-context error, we should return it + if err == nil && !errors.Is(e, context.Canceled) && !errors.Is(e, context.DeadlineExceeded) { + err = e + } + default: + break FOR } } + close(errCh) - // if the context was canceled - if e := ctx.Err(); err == nil && e != nil { - err = e + // if the context was canceled, and no other error received on errCh + if err == nil { + err = ctxErr } switch err { @@ -764,6 +796,9 @@ func (r *Router) routePeer(ctx context.Context, peerID types.NodeID, conn Connec // receivePeer receives inbound messages from a peer, deserializes them and // passes them on to the appropriate channel. func (r *Router) receivePeer(ctx context.Context, peerID types.NodeID, conn Connection) error { + timeout := time.NewTimer(0) + defer timeout.Stop() + for { chID, bz, err := conn.ReceiveMessage(ctx) if err != nil { @@ -771,11 +806,14 @@ func (r *Router) receivePeer(ctx context.Context, peerID types.NodeID, conn Conn } r.channelMtx.RLock() - queue, ok := r.channelQueues[chID] + queue, queueOk := r.channelQueues[chID] + chDesc, chDescOk := r.chDescs[chID] r.channelMtx.RUnlock() - if !ok { - r.logger.Debug("dropping message for unknown channel", "peer", peerID, "channel", chID) + if !queueOk || !chDescOk { + r.logger.Debug("dropping message for unknown channel", + "peer", peerID, "channel", chID, + "queue", queueOk, "chDesc", chDescOk) continue } @@ -784,7 +822,6 @@ func (r *Router) receivePeer(ctx context.Context, peerID types.NodeID, conn Conn r.logger.Error("message decoding failed, dropping message", "peer", peerID, "err", err) continue } - start := time.Now().UTC() envelope, err := EnvelopeFromProto(protoEnvelope) if err != nil { r.logger.Error("message decoding failed, dropping message", "peer", peerID, "err", err) @@ -792,6 +829,19 @@ func (r *Router) receivePeer(ctx context.Context, peerID types.NodeID, conn Conn } envelope.From = peerID envelope.ChannelID = chID + + // stop previous timeout counter and drain the timeout channel + timeout.Stop() + select { + case <-timeout.C: + default: + } + + if chDesc.EnqueueTimeout > 0 { + timeout.Reset(chDesc.EnqueueTimeout) + } + start := time.Now().UTC() + select { case queue.enqueue() <- envelope: r.metrics.PeerReceiveBytesTotal.With( @@ -804,7 +854,18 @@ func (r *Router) receivePeer(ctx context.Context, peerID types.NodeID, conn Conn case <-queue.closed(): r.logger.Debug("channel closed, dropping message", "peer", peerID, "channel", chID) + case <-timeout.C: + r.logger.Debug("dropping message from peer due to enqueue timeout", + "peer", peerID, + "channel", chID, + "channel_name", chDesc.Name, + "timeout", chDesc.EnqueueTimeout.String(), + "type", reflect.TypeOf((envelope.Message)).Name(), + "took", time.Since(start).String(), + ) + case <-ctx.Done(): + r.logger.Debug("receivePeer: ctx is done", "peer", peerID, "channel", chID) return nil } } diff --git a/internal/p2p/router_test.go b/internal/p2p/router_test.go index 05fdf33aba..d8cd615c87 100644 --- a/internal/p2p/router_test.go +++ b/internal/p2p/router_test.go @@ -3,8 +3,10 @@ package p2p_test import ( "context" "errors" + "fmt" "io" "runtime" + "strconv" "strings" "testing" "time" @@ -12,6 +14,7 @@ import ( "github.com/fortytw2/leaktest" gogotypes "github.com/gogo/protobuf/types" sync "github.com/sasha-s/go-deadlock" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" dbm "github.com/tendermint/tm-db" @@ -51,7 +54,7 @@ func TestRouter_Network(t *testing.T) { t.Cleanup(leaktest.Check(t)) // Create a test network and open a channel where all peers run echoReactor. - network := p2ptest.MakeNetwork(ctx, t, p2ptest.NetworkOptions{NumNodes: 8}) + network := p2ptest.MakeNetwork(ctx, t, p2ptest.NetworkOptions{NumNodes: 8}, log.NewNopLogger()) local := network.AnyNode() peers := network.Peers(local.NodeID) channels := network.MakeChannels(ctx, t, chDesc) @@ -107,7 +110,7 @@ func TestRouter_Channel_Basic(t *testing.T) { peerManager, err := p2p.NewPeerManager(ctx, selfID, dbm.NewMemDB(), p2p.PeerManagerOptions{}) require.NoError(t, err) - testnet := p2ptest.MakeNetwork(ctx, t, p2ptest.NetworkOptions{NumNodes: 1}) + testnet := p2ptest.MakeNetwork(ctx, t, p2ptest.NetworkOptions{NumNodes: 1}, log.NewNopLogger()) router, err := p2p.NewRouter( log.NewNopLogger(), @@ -174,7 +177,7 @@ func TestRouter_Channel_SendReceive(t *testing.T) { t.Cleanup(leaktest.Check(t)) // Create a test network and open a channel on all nodes. - network := p2ptest.MakeNetwork(ctx, t, p2ptest.NetworkOptions{NumNodes: 3}) + network := p2ptest.MakeNetwork(ctx, t, p2ptest.NetworkOptions{NumNodes: 3}, log.NewNopLogger()) ids := network.NodeIDs() aID, bID, cID := ids[0], ids[1], ids[2] @@ -240,7 +243,7 @@ func TestRouter_Channel_Broadcast(t *testing.T) { defer cancel() // Create a test network and open a channel on all nodes. - network := p2ptest.MakeNetwork(ctx, t, p2ptest.NetworkOptions{NumNodes: 4}) + network := p2ptest.MakeNetwork(ctx, t, p2ptest.NetworkOptions{NumNodes: 4}, log.NewNopLogger()) ids := network.NodeIDs() aID, bID, cID, dID := ids[0], ids[1], ids[2], ids[3] @@ -271,7 +274,7 @@ func TestRouter_Channel_Error(t *testing.T) { defer cancel() // Create a test network and open a channel on all nodes. - network := p2ptest.MakeNetwork(ctx, t, p2ptest.NetworkOptions{NumNodes: 3}) + network := p2ptest.MakeNetwork(ctx, t, p2ptest.NetworkOptions{NumNodes: 3}, log.NewNopLogger()) network.Start(ctx, t) ids := network.NodeIDs() @@ -874,3 +877,99 @@ func TestRouter_DontSendOnInvalidChannel(t *testing.T) { router.Stop() mockTransport.AssertExpectations(t) } + +// Given a channel with non-zero enqueue timeout, +// when I send more messages than recv channel capacity, +// and I wait longer than enqueue timeout, +// then I should receive only the messages that fit into the recv channel capacity. +func TestRouter_Channel_Enqueue_Timeout(t *testing.T) { + type testCase struct { + sendCount int + expectedRecvCount int + delay time.Duration + } + + chDesc := &p2p.ChannelDescriptor{ + ID: chID, + Priority: 5, + SendQueueCapacity: 100, + RecvMessageCapacity: 10240, //10kB + EnqueueTimeout: 10 * time.Millisecond, // FIXME: Check if this doesn't affect other tests + RecvBufferCapacity: 10, + } + const processingTime = 10 * time.Millisecond + + testCases := []testCase{ + {sendCount: chDesc.RecvBufferCapacity, expectedRecvCount: chDesc.RecvBufferCapacity, delay: 0}, + {sendCount: chDesc.RecvBufferCapacity * 2, expectedRecvCount: chDesc.RecvBufferCapacity * 2, delay: 0}, + {sendCount: 1, expectedRecvCount: 1, delay: chDesc.EnqueueTimeout + 10*time.Millisecond}, + {sendCount: chDesc.RecvBufferCapacity - 1, expectedRecvCount: chDesc.RecvBufferCapacity - 1, delay: chDesc.EnqueueTimeout + processingTime}, + {sendCount: chDesc.RecvBufferCapacity, expectedRecvCount: chDesc.RecvBufferCapacity, delay: chDesc.EnqueueTimeout + processingTime}, + {sendCount: chDesc.RecvBufferCapacity + 1, expectedRecvCount: chDesc.RecvBufferCapacity, delay: 2*chDesc.EnqueueTimeout + processingTime}, + {sendCount: chDesc.RecvBufferCapacity + 5, expectedRecvCount: chDesc.RecvBufferCapacity, delay: 6*chDesc.EnqueueTimeout + processingTime}, + } + + // how many more messages we send than the recv channel capacity + + logger := log.NewTestingLoggerWithLevel(t, log.LogLevelDebug).WithTimestamp() + + t.Cleanup(leaktest.Check(t)) + + // ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + // defer cancel() + + for _, tc := range testCases { + tc := tc + t.Run(fmt.Sprintf("send=%d,recv=%d,delay=%s", tc.sendCount, tc.expectedRecvCount, tc.delay), func(t *testing.T) { + // timeout that will expire if we don't receive some of the expected messages + ctxTimeout := tc.delay + 200*time.Millisecond + ctx, cancel := context.WithTimeout(context.Background(), ctxTimeout) + defer cancel() + // Create a test network and open a channel on all nodes. + network := p2ptest.MakeNetwork(ctx, t, p2ptest.NetworkOptions{NumNodes: 2}, logger) + + ids := network.NodeIDs() + aID, bID := ids[0], ids[1] + channels := network.MakeChannels(ctx, t, chDesc) + a, b := channels[aID], channels[bID] + + network.Start(ctx, t) + + wg := sync.WaitGroup{} + + // Start the test - send messages in a goroutine to not block on full chan + wg.Add(1) + go func() { + for i := 0; i < tc.sendCount; i++ { + sentEnvelope := p2p.Envelope{To: bID, Message: &p2ptest.Message{Value: strconv.Itoa(i)}} + p2ptest.RequireSend(ctx, t, a, sentEnvelope) + logger.Trace("Sent message", "id", i) + } + + wg.Done() + }() + + // sleep to ensure the timeout expired and at least some msgs will be dropped + time.Sleep(tc.delay) + count := 0 + + // check if we received all the messages we expected + iter := b.Receive(ctx) + for count < tc.expectedRecvCount && iter.Next(ctx) { + // this will hang if we don't receive the expected number of messages + e := iter.Envelope() + logger.Trace("received message", "message", e.Message) + count++ + } + logger.Info("received %d messages", count) + + wg.Wait() + + // this will error if we receive too many messages + p2ptest.RequireEmpty(ctx, t, a, b) + + // this will error if we don't receive the expected number of messages + assert.NoError(t, ctx.Err(), "timed out, received %d msgs, expected %d", count, tc.expectedRecvCount) + }) + } +} diff --git a/internal/p2p/transport_mconn.go b/internal/p2p/transport_mconn.go index 7d4b3d046f..3757f8f7d4 100644 --- a/internal/p2p/transport_mconn.go +++ b/internal/p2p/transport_mconn.go @@ -147,11 +147,15 @@ func (m *MConnTransport) Accept(ctx context.Context) (Connection, error) { if err != nil { select { case errCh <- err: + case <-m.doneCh: + m.logger.Trace("MConnTransport Accept: connection closed - doneCh") case <-ctx.Done(): } } select { case conCh <- tcpConn: + case <-m.doneCh: + m.logger.Trace("MConnTransport Accept: connection closed - doneCh") case <-ctx.Done(): } }() @@ -187,12 +191,10 @@ func (m *MConnTransport) Dial(ctx context.Context, endpoint *Endpoint) (Connecti tcpConn, err := dialer.DialContext(ctx, "tcp", net.JoinHostPort( endpoint.IP.String(), strconv.Itoa(int(endpoint.Port)))) if err != nil { - select { - case <-ctx.Done(): - return nil, ctx.Err() - default: - return nil, err + if e := ctx.Err(); e != nil { + return nil, e } + return nil, err } return newMConnConnection(m.logger, tcpConn, m.mConnConfig, m.channelDescs), nil @@ -313,6 +315,7 @@ func (c *mConnConnection) Handshake( select { case errCh <- err: + case <-c.doneCh: case <-handshakeCtx.Done(): } @@ -322,7 +325,8 @@ func (c *mConnConnection) Handshake( case <-handshakeCtx.Done(): _ = c.Close() return types.NodeInfo{}, nil, handshakeCtx.Err() - + case <-c.doneCh: + return types.NodeInfo{}, nil, io.EOF case err := <-errCh: if err != nil { return types.NodeInfo{}, nil, err @@ -365,6 +369,7 @@ func (c *mConnConnection) handshake( _, err := protoio.NewDelimitedWriter(secretConn).WriteMsg(nodeInfo.ToProto()) select { case errCh <- err: + case <-c.doneCh: case <-ctx.Done(): } @@ -375,6 +380,7 @@ func (c *mConnConnection) handshake( _, err := protoio.NewDelimitedReader(secretConn, types.MaxNodeInfoSize()).ReadMsg(&pbPeerInfo) select { case errCh <- err: + case <-c.doneCh: case <-ctx.Done(): } }() @@ -410,6 +416,7 @@ func (c *mConnConnection) handshake( func (c *mConnConnection) onReceive(ctx context.Context, chID ChannelID, payload []byte) { select { case c.receiveCh <- mConnMessage{channelID: chID, payload: payload}: + case <-c.doneCh: case <-ctx.Done(): } } @@ -426,6 +433,7 @@ func (c *mConnConnection) onError(ctx context.Context, e interface{}) { _ = c.Close() select { case c.errorCh <- err: + case <-c.doneCh: case <-ctx.Done(): } } @@ -445,6 +453,8 @@ func (c *mConnConnection) SendMessage(ctx context.Context, chID ChannelID, msg [ return err case <-ctx.Done(): return io.EOF + case <-c.doneCh: + return io.EOF default: if ok := c.mconn.Send(chID, msg); !ok { return errors.New("sending message timed out") @@ -458,10 +468,13 @@ func (c *mConnConnection) SendMessage(ctx context.Context, chID ChannelID, msg [ func (c *mConnConnection) ReceiveMessage(ctx context.Context) (ChannelID, []byte, error) { select { case err := <-c.errorCh: + c.logger.Debug("ReceiveMessage: error occurred", "err", err) return 0, nil, err case <-c.doneCh: + c.logger.Trace("ReceiveMessage: connection closed - doneCh") return 0, nil, io.EOF case <-ctx.Done(): + c.logger.Trace("ReceiveMessage: connection closed - ctx.Done()") return 0, nil, io.EOF case msg := <-c.receiveCh: return msg.channelID, msg.payload, nil @@ -496,6 +509,7 @@ func (c *mConnConnection) RemoteEndpoint() Endpoint { func (c *mConnConnection) Close() error { var err error c.closeOnce.Do(func() { + c.logger.Trace("mConnConnection.Close(): closing doneCh") defer close(c.doneCh) if c.mconn != nil && c.mconn.IsRunning() { diff --git a/libs/log/testing.go b/libs/log/testing.go index e356163255..f7e0b47f19 100644 --- a/libs/log/testing.go +++ b/libs/log/testing.go @@ -70,6 +70,20 @@ type TestingLogger struct { assertions []assertion } +// WithTimestamp returns a new TestingLogger with timestamp enabled. +func (tw *TestingLogger) WithTimestamp() *TestingLogger { + l := TestingLogger{ + t: tw.t, + assertions: tw.assertions, + defaultLogger: defaultLogger{ + Logger: tw.defaultLogger.Logger.With().Timestamp().Logger(), + closeFuncs: tw.defaultLogger.closeFuncs, + }, + } + + return &l +} + type assertion struct { match regexp.Regexp passed bool diff --git a/test/e2e/runner/setup.go b/test/e2e/runner/setup.go index b46d101857..5fad95776d 100644 --- a/test/e2e/runner/setup.go +++ b/test/e2e/runner/setup.go @@ -218,6 +218,8 @@ func MakeConfig(node *e2e.Node) (*config.Config, error) { cfg.LogLevel = node.LogLevel } + cfg.Mempool.TxEnqueueTimeout = 10 * time.Millisecond + cfg.RPC.ListenAddress = "tcp://0.0.0.0:26657" cfg.RPC.PprofListenAddress = ":6060" cfg.P2P.ExternalAddress = fmt.Sprintf("tcp://%v", node.AddressP2P(false)) diff --git a/test/e2e/tests/app_test.go b/test/e2e/tests/app_test.go index b14e1ef684..6430f802f6 100644 --- a/test/e2e/tests/app_test.go +++ b/test/e2e/tests/app_test.go @@ -214,7 +214,7 @@ func TestApp_TxTooBig(t *testing.T) { } /// timeout for broadcast to single node - const broadcastTimeout = 5 * time.Second + const broadcastTimeout = 10 * time.Second /// Timeout to read response from single node const readTimeout = 1 * time.Second /// Time to process whole mempool