Skip to content

Commit

Permalink
feat(p2p): channel enqueue timeout and improved router cleanup (#754)
Browse files Browse the repository at this point in the history
* feat: timeout when trying to enqueue incoming mempool messages

* chore: fix panic

* chore: increase enqueue timeout to 10ms

* chore: improve error desc

* fix(p2p): EnqueueTimeout breaks unrelated channels

* chore(p2p): more logging of disconnect errors

* refactor(p2p): refactor routePeer and mConn shutdown

* chore(p2p): drait EnqueueTimeout chan safely

* fix(p2p):  peerManager.Disconnected context expires too soon

* chore: self-review

* test(e2e): test consensus enqueue timeout

* chore: self-review

* chore(log): lint

* test(p2p): increase TestRouter_Channel_Enqueue_Timeout ctx timeout for GHA testing

* feat(configl): add Mempool.TxEnqueueTimeout variable

* chore:typo

* test(e2e): set TX enqueue timeout to 10 milliseconds
  • Loading branch information
lklimek authored Mar 12, 2024
1 parent 37870a8 commit 5764ab3
Show file tree
Hide file tree
Showing 19 changed files with 268 additions and 50 deletions.
9 changes: 9 additions & 0 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -831,6 +831,12 @@ type MempoolConfig struct {
// it's insertion time into the mempool is beyond TTLDuration.
TTLNumBlocks int64 `mapstructure:"ttl-num-blocks"`

// TxEnqueueTimeout defines how long new mempool transaction will wait when internal
// processing queue is full (most likely due to busy CheckTx execution).
// Once the timeout is reached, the transaction will be silently dropped.
// If set to 0, the timeout is disabled and transactions will wait indefinitely.
TxEnqueueTimeout time.Duration `mapstructure:"tx-enqueue-timeout"`

// Timeout of check TX operations received from other nodes.
// Use 0 to disable.
TimeoutCheckTx time.Duration `mapstructure:"timeout-check-tx"`
Expand Down Expand Up @@ -880,6 +886,9 @@ func (cfg *MempoolConfig) ValidateBasic() error {
if cfg.TTLNumBlocks < 0 {
return errors.New("ttl-num-blocks can't be negative")
}
if cfg.TxEnqueueTimeout < 0 {
return errors.New("tx-enqueue-timeout can't be negative")
}
if cfg.TimeoutCheckTx < 0 {
return errors.New("timeout-check-tx can't be negative")
}
Expand Down
8 changes: 8 additions & 0 deletions config/toml.go
Original file line number Diff line number Diff line change
Expand Up @@ -433,6 +433,14 @@ ttl-duration = "{{ .Mempool.TTLDuration }}"
# it's insertion time into the mempool is beyond ttl-duration.
ttl-num-blocks = {{ .Mempool.TTLNumBlocks }}
# TxEnqueueTimeout defines how many nanoseconds new mempool transaction (received
# from other nodes) will wait when internal processing queue is full
# (most likely due to busy CheckTx execution).Once the timeout is reached, the transaction
# will be silently dropped.
#
# If set to 0, the timeout is disabled and transactions will wait indefinitely.
tx-enqueue-timeout = "{{ .Mempool.TxEnqueueTimeout }}"
# Timeout of check TX operations received from other nodes, using p2p protocol.
# Use 0 to disable.
timeout-check-tx = "{{ .Mempool.TimeoutCheckTx }}"
Expand Down
2 changes: 1 addition & 1 deletion internal/blocksync/reactor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ func setup(
rts := &reactorTestSuite{
config: conf,
logger: log.NewTestingLogger(t).With("module", "block_sync", "testCase", t.Name()),
network: p2ptest.MakeNetwork(ctx, t, p2ptest.NetworkOptions{Config: conf, NumNodes: numNodes}),
network: p2ptest.MakeNetwork(ctx, t, p2ptest.NetworkOptions{Config: conf, NumNodes: numNodes}, log.NewNopLogger()),
nodes: make([]types.NodeID, 0, numNodes),
reactors: make(map[types.NodeID]*Reactor, numNodes),
app: make(map[types.NodeID]abciclient.Client, numNodes),
Expand Down
2 changes: 1 addition & 1 deletion internal/consensus/reactor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ func setup(
privProTxHashes[i] = state.privValidator.ProTxHash
}
rts := &reactorTestSuite{
network: p2ptest.MakeNetwork(ctx, t, p2ptest.NetworkOptions{NumNodes: numNodes, ProTxHashes: privProTxHashes}),
network: p2ptest.MakeNetwork(ctx, t, p2ptest.NetworkOptions{NumNodes: numNodes, ProTxHashes: privProTxHashes}, log.NewNopLogger()),
states: make(map[types.NodeID]*State),
reactors: make(map[types.NodeID]*Reactor, numNodes),
subs: make(map[types.NodeID]eventbus.Subscription, numNodes),
Expand Down
2 changes: 1 addition & 1 deletion internal/evidence/reactor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ func setup(ctx context.Context, t *testing.T, stateStores []sm.Store) *reactorTe
rts := &reactorTestSuite{
numStateStores: numStateStores,
logger: log.NewNopLogger().With("testCase", t.Name()),
network: p2ptest.MakeNetwork(ctx, t, p2ptest.NetworkOptions{NumNodes: numStateStores}),
network: p2ptest.MakeNetwork(ctx, t, p2ptest.NetworkOptions{NumNodes: numStateStores}, log.NewNopLogger()),
reactors: make(map[types.NodeID]*evidence.Reactor, numStateStores),
pools: make(map[types.NodeID]*evidence.Pool, numStateStores),
peerUpdates: make(map[types.NodeID]*p2p.PeerUpdates, numStateStores),
Expand Down
2 changes: 1 addition & 1 deletion internal/mempool/reactor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ func setupReactors(ctx context.Context, t *testing.T, logger log.Logger, numNode

rts := &reactorTestSuite{
logger: log.NewNopLogger().With("testCase", t.Name()),
network: p2ptest.MakeNetwork(ctx, t, p2ptest.NetworkOptions{NumNodes: numNodes}),
network: p2ptest.MakeNetwork(ctx, t, p2ptest.NetworkOptions{NumNodes: numNodes}, log.NewNopLogger()),
reactors: make(map[types.NodeID]*Reactor, numNodes),
mempools: make(map[types.NodeID]*TxMempool, numNodes),
kvstores: make(map[types.NodeID]*kvstore.Application, numNodes),
Expand Down
1 change: 1 addition & 0 deletions internal/p2p/channel_params.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ func ChannelDescriptors(cfg *config.Config) map[ChannelID]*ChannelDescriptor {
RecvMessageCapacity: mempoolBatchSize(cfg.Mempool.MaxTxBytes),
RecvBufferCapacity: 128,
Name: "mempool",
EnqueueTimeout: cfg.Mempool.TxEnqueueTimeout,
},
SnapshotChannel: {
ID: SnapshotChannel,
Expand Down
11 changes: 9 additions & 2 deletions internal/p2p/conn/connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -612,16 +612,23 @@ type ChannelDescriptor struct {
Priority int

// TODO: Remove once p2p refactor is complete.
SendQueueCapacity int
SendQueueCapacity int
// RecvMessageCapacity defines the max message size for a given p2p Channel.
RecvMessageCapacity int

// RecvBufferCapacity defines the max buffer size of inbound messages for a
// RecvBufferCapacity defines the max number of inbound messages for a
// given p2p Channel queue.
RecvBufferCapacity int

// Human readable name of the channel, used in logging and
// diagnostics.
Name string

// Timeout for enqueue operations on the incoming queue.
// When timeout expires, messages will be silently dropped.
//
// If zero, enqueue operations will not time out.
EnqueueTimeout time.Duration
}

func (chDesc ChannelDescriptor) FillDefaults() (filled ChannelDescriptor) {
Expand Down
3 changes: 1 addition & 2 deletions internal/p2p/p2ptest/network.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,9 +54,8 @@ func (opts *NetworkOptions) setDefaults() {

// MakeNetwork creates a test network with the given number of nodes and
// connects them to each other.
func MakeNetwork(ctx context.Context, t *testing.T, opts NetworkOptions) *Network {
func MakeNetwork(ctx context.Context, t *testing.T, opts NetworkOptions, logger log.Logger) *Network {
opts.setDefaults()
logger := log.NewNopLogger()
network := &Network{
Nodes: map[types.NodeID]*Node{},
logger: logger,
Expand Down
3 changes: 2 additions & 1 deletion internal/p2p/p2ptest/require.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,8 @@ func RequireEmpty(ctx context.Context, t *testing.T, channels ...p2p.Channel) {
count := 0
for iter.Next(ctx) {
count++
require.Nil(t, iter.Envelope())
e := iter.Envelope()
require.Nil(t, e, "received unexpected message %v", e.Message)
}
require.Zero(t, count)
require.Error(t, ctx.Err())
Expand Down
2 changes: 1 addition & 1 deletion internal/p2p/pex/reactor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -380,7 +380,7 @@ func setupNetwork(ctx context.Context, t *testing.T, opts testOptions) *reactorT

rts := &reactorTestSuite{
logger: log.NewNopLogger().With("testCase", t.Name()),
network: p2ptest.MakeNetwork(ctx, t, networkOpts),
network: p2ptest.MakeNetwork(ctx, t, networkOpts, log.NewNopLogger()),
reactors: make(map[types.NodeID]*pex.Reactor, realNodes),
pexChannels: make(map[types.NodeID]p2p.Channel, opts.TotalNodes),
peerChans: make(map[types.NodeID]chan p2p.PeerUpdate, opts.TotalNodes),
Expand Down
9 changes: 6 additions & 3 deletions internal/p2p/pqueue.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,13 +99,16 @@ func newPQScheduler(
logger log.Logger,
m *Metrics,
lc *metricsLabelCache,
chDescs []*ChannelDescriptor,
chDescs map[ChannelID]*ChannelDescriptor,
enqueueBuf, dequeueBuf, capacity uint,
) *pqScheduler {

// copy each ChannelDescriptor and sort them by ascending channel priority
chDescsCopy := make([]*ChannelDescriptor, len(chDescs))
copy(chDescsCopy, chDescs)
chDescsCopy := make([]*ChannelDescriptor, 0, len(chDescs))
for _, chDesc := range chDescs {
chDescsCopy = append(chDescsCopy, chDesc)
}

sort.Slice(chDescsCopy, func(i, j int) bool { return chDescsCopy[i].Priority < chDescsCopy[j].Priority })

var (
Expand Down
4 changes: 2 additions & 2 deletions internal/p2p/pqueue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@ type testMessage = gogotypes.StringValue

func TestCloseWhileDequeueFull(t *testing.T) {
enqueueLength := 5
chDescs := []*ChannelDescriptor{
{ID: 0x01, Priority: 1},
chDescs := map[ChannelID]*ChannelDescriptor{
0x01: {ID: 0x01, Priority: 1},
}
pqueue := newPQScheduler(log.NewNopLogger(), NopMetrics(), newMetricsLabelCache(), chDescs, uint(enqueueLength), 1, 120)

Expand Down
107 changes: 84 additions & 23 deletions internal/p2p/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"fmt"
"io"
"net"
"reflect"
"runtime"
"time"

Expand Down Expand Up @@ -151,7 +152,7 @@ type Router struct {
options RouterOptions
privKey crypto.PrivKey
peerManager *PeerManager
chDescs []*ChannelDescriptor
chDescs map[ChannelID]*ChannelDescriptor
transport Transport
endpoint *Endpoint
connTracker connectionTracker
Expand Down Expand Up @@ -198,7 +199,7 @@ func NewRouter(
options.MaxIncomingConnectionAttempts,
options.IncomingConnectionWindow,
),
chDescs: make([]*ChannelDescriptor, 0),
chDescs: make(map[ChannelID]*ChannelDescriptor, 0),
transport: transport,
endpoint: endpoint,
peerManager: peerManager,
Expand Down Expand Up @@ -256,7 +257,7 @@ func (r *Router) OpenChannel(ctx context.Context, chDesc *ChannelDescriptor) (Ch
if _, ok := r.channelQueues[id]; ok {
return nil, fmt.Errorf("channel %v already exists", id)
}
r.chDescs = append(r.chDescs, chDesc)
r.chDescs[id] = chDesc

queue := r.queueFactory(chDesc.RecvBufferCapacity)
outCh := make(chan Envelope, chDesc.RecvBufferCapacity)
Expand Down Expand Up @@ -698,13 +699,19 @@ func (r *Router) routePeer(ctx context.Context, peerID types.NodeID, conn Connec
r.metrics.PeersConnected.Add(1)
r.peerManager.Ready(ctx, peerID, channels)

// we use context to manage the lifecycle of the peer
// note that original ctx will be used in cleanup
ioCtx, cancel := context.WithCancel(ctx)
defer cancel()

sendQueue := r.getOrMakeQueue(peerID, channels)
defer func() {
r.peerMtx.Lock()
delete(r.peerQueues, peerID)
delete(r.peerChannels, peerID)
r.peerMtx.Unlock()

_ = conn.Close()
sendQueue.close()

r.peerManager.Disconnected(ctx, peerID)
Expand All @@ -714,43 +721,68 @@ func (r *Router) routePeer(ctx context.Context, peerID types.NodeID, conn Connec
r.logger.Info("peer connected", "peer", peerID, "endpoint", conn)

errCh := make(chan error, 2)
wg := sync.WaitGroup{}
wg.Add(2)

go func() {
select {
case errCh <- r.receivePeer(ctx, peerID, conn):
case <-ctx.Done():
case errCh <- r.receivePeer(ioCtx, peerID, conn):
case <-ioCtx.Done():
}
wg.Done()
}()

go func() {
select {
case errCh <- r.sendPeer(ctx, peerID, conn, sendQueue):
case <-ctx.Done():
case errCh <- r.sendPeer(ioCtx, peerID, conn, sendQueue):
case <-ioCtx.Done():
}
wg.Done()
}()

var err error
// wait for error from first goroutine

var (
err error
ctxErr error
)

select {
case err = <-errCh:
case <-ctx.Done():
r.logger.Debug("routePeer: received error from subroutine 1", "peer", peerID, "err", err)
case <-ioCtx.Done():
r.logger.Debug("routePeer: ctx done", "peer", peerID)
ctxErr = ioCtx.Err()
}

// goroutine 1 has finished, so we can cancel the context and close everything
cancel()
_ = conn.Close()
sendQueue.close()

select {
case <-ctx.Done():
case e := <-errCh:
// The first err was nil, so we update it with the second err, which may
// or may not be nil.
if err == nil {
err = e
r.logger.Debug("routePeer: closed conn and send queue, waiting for all goroutines to finish", "peer", peerID, "err", err)
wg.Wait()
r.logger.Debug("routePeer: all goroutines finished", "peer", peerID, "err", err)

// Drain the error channel; these should typically not be interesting
FOR:
for {
select {
case e := <-errCh:
r.logger.Debug("routePeer: received error when draining errCh", "peer", peerID, "err", e)
// if we received non-context error, we should return it
if err == nil && !errors.Is(e, context.Canceled) && !errors.Is(e, context.DeadlineExceeded) {
err = e
}
default:
break FOR
}
}
close(errCh)

// if the context was canceled
if e := ctx.Err(); err == nil && e != nil {
err = e
// if the context was canceled, and no other error received on errCh
if err == nil {
err = ctxErr
}

switch err {
Expand All @@ -764,18 +796,24 @@ func (r *Router) routePeer(ctx context.Context, peerID types.NodeID, conn Connec
// receivePeer receives inbound messages from a peer, deserializes them and
// passes them on to the appropriate channel.
func (r *Router) receivePeer(ctx context.Context, peerID types.NodeID, conn Connection) error {
timeout := time.NewTimer(0)
defer timeout.Stop()

for {
chID, bz, err := conn.ReceiveMessage(ctx)
if err != nil {
return err
}

r.channelMtx.RLock()
queue, ok := r.channelQueues[chID]
queue, queueOk := r.channelQueues[chID]
chDesc, chDescOk := r.chDescs[chID]
r.channelMtx.RUnlock()

if !ok {
r.logger.Debug("dropping message for unknown channel", "peer", peerID, "channel", chID)
if !queueOk || !chDescOk {
r.logger.Debug("dropping message for unknown channel",
"peer", peerID, "channel", chID,
"queue", queueOk, "chDesc", chDescOk)
continue
}

Expand All @@ -784,14 +822,26 @@ func (r *Router) receivePeer(ctx context.Context, peerID types.NodeID, conn Conn
r.logger.Error("message decoding failed, dropping message", "peer", peerID, "err", err)
continue
}
start := time.Now().UTC()
envelope, err := EnvelopeFromProto(protoEnvelope)
if err != nil {
r.logger.Error("message decoding failed, dropping message", "peer", peerID, "err", err)
continue
}
envelope.From = peerID
envelope.ChannelID = chID

// stop previous timeout counter and drain the timeout channel
timeout.Stop()
select {
case <-timeout.C:
default:
}

if chDesc.EnqueueTimeout > 0 {
timeout.Reset(chDesc.EnqueueTimeout)
}
start := time.Now().UTC()

select {
case queue.enqueue() <- envelope:
r.metrics.PeerReceiveBytesTotal.With(
Expand All @@ -804,7 +854,18 @@ func (r *Router) receivePeer(ctx context.Context, peerID types.NodeID, conn Conn
case <-queue.closed():
r.logger.Debug("channel closed, dropping message", "peer", peerID, "channel", chID)

case <-timeout.C:
r.logger.Debug("dropping message from peer due to enqueue timeout",
"peer", peerID,
"channel", chID,
"channel_name", chDesc.Name,
"timeout", chDesc.EnqueueTimeout.String(),
"type", reflect.TypeOf((envelope.Message)).Name(),
"took", time.Since(start).String(),
)

case <-ctx.Done():
r.logger.Debug("receivePeer: ctx is done", "peer", peerID, "channel", chID)
return nil
}
}
Expand Down
Loading

0 comments on commit 5764ab3

Please sign in to comment.