Skip to content

Commit

Permalink
fix(mempool): cancel previous mempool run when starting new one (#760)
Browse files Browse the repository at this point in the history
* feat(p2p): throttled channel

(cherry picked from commit 5e41958)

* feat: limit mempool broadcast to 5/s

(cherry picked from commit cdde233)

* feat(p2p): channel recv rate limiting - not tested

(cherry picked from commit f7f7ce7)

* feat(p2p): channel recv rate limiting - continued

(cherry picked from commit 54e00f7)

* chore(p2p): regen channel mocks

* feat(config): mempool tx-send-rate-limit, tx-recv-rate-limit, tx-recv-rate-punish-peer

* chore: lint

* chore(mempool): burst recv twice as big as burst send

* chore: lint

* chore: remove not needed log

* chore: fixes after merge

* refactor(p2p): move chan descs to p2p.channel_params and tune priorities

* chore: fix after merge

* chore(statesync): fix linter issues - remove unused consts

* fix(mempool): cancel previous mempool run when starting new one

* test(mempool): no parallel recheck tx task groups

* test(mepmpool): increase TestTxMempool_OneRecheckTxAtTime timeout

* chore: improve comment

* test(mempool): fix TestTxMempool_OneRecheckTxAtTime
  • Loading branch information
lklimek authored Mar 12, 2024
1 parent 9028d25 commit ebe4523
Show file tree
Hide file tree
Showing 2 changed files with 102 additions and 0 deletions.
13 changes: 13 additions & 0 deletions internal/mempool/mempool.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,9 @@ type TxMempool struct {
txs *clist.CList // valid transactions (passed CheckTx)
txByKey map[types.TxKey]*clist.CElement
txBySender map[string]*clist.CElement // for sender != ""

// cancellation function for recheck txs tasks
recheckCancel context.CancelFunc
}

// NewTxMempool constructs a new, empty priority mempool at the specified
Expand Down Expand Up @@ -719,6 +722,12 @@ func (txmp *TxMempool) handleRecheckResult(tx types.Tx, checkTxRes *abci.Respons
// Precondition: The mempool is not empty.
// The caller must hold txmp.mtx exclusively.
func (txmp *TxMempool) recheckTransactions(ctx context.Context) {
// cancel previous recheck if it is still running
if txmp.recheckCancel != nil {
txmp.recheckCancel()
}
ctx, txmp.recheckCancel = context.WithCancel(ctx)

if txmp.Size() == 0 {
panic("mempool: cannot run recheck on an empty mempool")
}
Expand All @@ -742,6 +751,10 @@ func (txmp *TxMempool) recheckTransactions(ctx context.Context) {
for _, wtx := range wtxs {
wtx := wtx
start(func() error {
if err := ctx.Err(); err != nil {
txmp.logger.Trace("recheck txs task canceled", "err", err, "tx", wtx.hash)
return err
}
rsp, err := txmp.proxyAppConn.CheckTx(ctx, &abci.RequestCheckTx{
Tx: wtx.tx,
Type: abci.CheckTxType_Recheck,
Expand Down
89 changes: 89 additions & 0 deletions internal/mempool/mempool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,23 +7,29 @@ import (
"fmt"
"math/rand"
"os"
"runtime"
"sort"
"strconv"
"strings"
"sync/atomic"
"testing"
"time"

"github.com/fortytw2/leaktest"
sync "github.com/sasha-s/go-deadlock"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"

abciclient "github.com/dashpay/tenderdash/abci/client"
"github.com/dashpay/tenderdash/abci/example/code"
"github.com/dashpay/tenderdash/abci/example/kvstore"
abci "github.com/dashpay/tenderdash/abci/types"
"github.com/dashpay/tenderdash/abci/types/mocks"
"github.com/dashpay/tenderdash/config"
"github.com/dashpay/tenderdash/libs/log"
tmrand "github.com/dashpay/tenderdash/libs/rand"
"github.com/dashpay/tenderdash/types"
)

Expand Down Expand Up @@ -739,6 +745,89 @@ func TestTxMempool_CheckTxPostCheckError(t *testing.T) {
}
}

// TestTxMempool_OneRecheckTxAtTime checks if previous recheckTransactions task is canceled when another one is started.
//
// Given mempool with some transactions AND app that processes CheckTX very slowly,
// when we call recheckTransactions() twice,
// then first recheckTransactions task is canceled and second one starts from the beginning.
func TestTxMempool_OneRecheckTxAtTime(t *testing.T) {
// SETUP
t.Cleanup(leaktest.Check(t))

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

logger := log.NewTestingLogger(t)

// num of parallel tasks started in recheckTransactions; this is how many
// txs will be processed as a minimum
numRecheckTasks := 2 * runtime.NumCPU()
numTxs := 3 * numRecheckTasks

app := mocks.NewApplication(t)
var (
checkTxCounter atomic.Uint32
recheckTxBlocker sync.Mutex
)
// app will wait on recheckTxBlocker until we unblock it
app.On("CheckTx", mock.Anything, mock.Anything).Return(&abci.ResponseCheckTx{
Priority: 1,
Code: abci.CodeTypeOK}, nil).
Run(func(_ mock.Arguments) {
// increase counter before locking, so we can check if it was called
checkTxCounter.Add(1)
recheckTxBlocker.Lock()
defer recheckTxBlocker.Unlock()
})

client := abciclient.NewLocalClient(log.NewNopLogger(), app)
cfg := config.TestConfig()
mp := NewTxMempool(logger, cfg.Mempool, client)
// add some txs to mempool
for i := 0; i < numTxs; i++ {
err := mp.addNewTransaction(randomTx(), &abci.ResponseCheckTx{Code: abci.CodeTypeOK, GasWanted: 1, Priority: int64(i + 1)})
require.NoError(t, err)
}

// TEST

// block checkTx until we unblock it
recheckTxBlocker.Lock()
// start recheckTransactions in the background; it should process exactly one tx per recheck task
mp.recheckTransactions(ctx)
assert.Eventually(t,
func() bool { return checkTxCounter.Load() == uint32(numRecheckTasks) },
200*time.Millisecond, 10*time.Millisecond,
"1st run: processed %d txs, expected %d", checkTxCounter.Load(), numRecheckTasks)

// another recheck should cancel the first run and start from the beginning , but pending checkTx ops should finish
mp.recheckTransactions(ctx)
// unlock the app; this should finish all started rechecks, but not continue with rechecks from 1st run
recheckTxBlocker.Unlock()
// Ensure that all goroutines/tasks have finished
assert.Eventually(t, func() bool { return uint32(numRecheckTasks+numTxs) == checkTxCounter.Load() },
200*time.Millisecond, 10*time.Millisecond,
"num of txs mismatch: got %d, expected %d", checkTxCounter.Load(), numRecheckTasks+numTxs)

// let's give it some more time and ensure we don't process any further txs
if !testing.Short() {
time.Sleep(100 * time.Millisecond)
assert.Equal(t, uint32(numRecheckTasks+numTxs), checkTxCounter.Load())
}
}

func randomTx() *WrappedTx {
tx := tmrand.Bytes(10)
return &WrappedTx{
tx: tx,
height: 1,
timestamp: time.Now(),
gasWanted: 1,
priority: 1,
peers: map[uint16]bool{},
}
}

func mustKvStore(t *testing.T, opts ...kvstore.OptFunc) *kvstore.Application {
opts = append(opts, kvstore.WithLogger(log.NewTestingLogger(t).With("module", "kvstore")))
app, err := kvstore.NewMemoryApp(opts...)
Expand Down

0 comments on commit ebe4523

Please sign in to comment.