Skip to content

Commit

Permalink
pool: truly discard underpriced, Transfer after lock is over
Browse files Browse the repository at this point in the history
  • Loading branch information
emailtovamos committed Sep 18, 2024
1 parent b818cb7 commit 774e314
Show file tree
Hide file tree
Showing 3 changed files with 106 additions and 125 deletions.
66 changes: 20 additions & 46 deletions core/txpool/legacypool/buffer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (

"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
"github.com/stretchr/testify/require"
)

// Helper function to create a dummy transaction of specified size
Expand All @@ -16,18 +17,11 @@ func createDummyTransaction(size int) *types.Transaction {
func TestNewLRUBuffer(t *testing.T) {
capacity := 10
lru := NewLRUBuffer(capacity)
if lru.capacity != capacity {
t.Errorf("expected capacity %d, got %d", capacity, lru.capacity)
}
if lru.buffer.Len() != 0 {
t.Errorf("expected buffer length 0, got %d", lru.buffer.Len())
}
if len(lru.index) != 0 {
t.Errorf("expected index length 0, got %d", len(lru.index))
}
if lru.size != 0 {
t.Errorf("expected size 0, got %d", lru.size)
}

require.Equal(t, capacity, lru.capacity, "expected capacity to match")
require.Zero(t, lru.buffer.Len(), "expected buffer length to be zero")
require.Zero(t, len(lru.index), "expected index length to be zero")
require.Zero(t, lru.size, "expected size to be zero")
}

func TestAddAndGet(t *testing.T) {
Expand All @@ -39,19 +33,15 @@ func TestAddAndGet(t *testing.T) {
lru.Add(tx1)
lru.Add(tx2)

if lru.Size() != 2 {
t.Errorf("expected size 2, got %d", lru.Size())
}
require.Equal(t, 2, lru.Size(), "expected size to be 2")

retrievedTx, ok := lru.Get(tx1.Hash())
if !ok || retrievedTx.Hash() != tx1.Hash() {
t.Errorf("failed to retrieve tx1")
}
require.True(t, ok, "expected to retrieve tx1")
require.Equal(t, tx1.Hash(), retrievedTx.Hash(), "retrieved tx1 hash does not match")

retrievedTx, ok = lru.Get(tx2.Hash())
if !ok || retrievedTx.Hash() != tx2.Hash() {
t.Errorf("failed to retrieve tx2")
}
require.True(t, ok, "expected to retrieve tx2")
require.Equal(t, tx2.Hash(), retrievedTx.Hash(), "retrieved tx2 hash does not match")
}

func TestBufferCapacity(t *testing.T) {
Expand All @@ -64,19 +54,13 @@ func TestBufferCapacity(t *testing.T) {
lru.Add(tx1)
lru.Add(tx2)

if lru.Size() != 2 {
t.Errorf("expected size 2, got %d", lru.Size())
}
require.Equal(t, 2, lru.Size(), "expected size to be 2")

lru.Add(tx3)

if lru.Size() != 2 {
t.Errorf("expected size 2 after adding tx3, got %d", lru.Size())
}

if _, ok := lru.Get(tx1.Hash()); ok {
t.Errorf("expected tx1 to be evicted")
}
require.Equal(t, 2, lru.Size(), "expected size to remain 2 after adding tx3")
_, ok := lru.Get(tx1.Hash())
require.False(t, ok, "expected tx1 to be evicted")
}

func TestFlush(t *testing.T) {
Expand All @@ -92,15 +76,11 @@ func TestFlush(t *testing.T) {

flushedTxs := lru.Flush(2)

if len(flushedTxs) != 2 {
t.Errorf("expected to flush 2 transactions, got %d", len(flushedTxs))
}
require.Len(t, flushedTxs, 2, "expected to flush 2 transactions")

expectedSize := 1
actualSize := lru.Size()
if expectedSize != actualSize {
t.Errorf("expected size after flush %d, got %d", expectedSize, actualSize)
}
require.Equal(t, expectedSize, actualSize, "expected size after flush to match")
}

func TestSize(t *testing.T) {
Expand All @@ -110,17 +90,11 @@ func TestSize(t *testing.T) {
tx2 := createDummyTransaction(1500) // 2 slots

lru.Add(tx1)
if lru.Size() != 1 {
t.Errorf("expected size 1, got %d", lru.Size())
}
require.Equal(t, 1, lru.Size(), "expected size to be 1")

lru.Add(tx2)
if lru.Size() != 2 {
t.Errorf("expected size 2, got %d", lru.Size())
}
require.Equal(t, 2, lru.Size(), "expected size to be 2")

lru.Flush(1)
if lru.Size() != 1 {
t.Errorf("expected size 1 after flush, got %d", lru.Size())
}
require.Equal(t, 1, lru.Size(), "expected size to be 1 after flush")
}
97 changes: 47 additions & 50 deletions core/txpool/legacypool/legacypool.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,9 +143,8 @@ type Config struct {
Pool2Slots uint64 // Maximum number of transaction slots in pool 2
Pool3Slots uint64 // Maximum number of transaction slots in pool 3

Lifetime time.Duration // Maximum amount of time non-executable transaction are queued
ReannounceTime time.Duration // Duration for announcing local pending transactions again
InterPoolTransferTime time.Duration // Attempt to transfer from pool3 to pool2 every this much time
Lifetime time.Duration // Maximum amount of time non-executable transaction are queued
ReannounceTime time.Duration // Duration for announcing local pending transactions again
}

// DefaultConfig contains the default configurations for the transaction pool.
Expand All @@ -163,9 +162,8 @@ var DefaultConfig = Config{
Pool2Slots: 1024,
Pool3Slots: 1024,

Lifetime: 3 * time.Hour,
ReannounceTime: 10 * 365 * 24 * time.Hour,
InterPoolTransferTime: time.Minute,
Lifetime: 3 * time.Hour,
ReannounceTime: 10 * 365 * 24 * time.Hour,
}

// sanitize checks the provided user configurations and changes anything that's
Expand Down Expand Up @@ -200,10 +198,7 @@ func (config *Config) sanitize() Config {
log.Warn("Sanitizing invalid txpool global queue", "provided", conf.GlobalQueue, "updated", DefaultConfig.GlobalQueue)
conf.GlobalQueue = DefaultConfig.GlobalQueue
}
if conf.Pool3Slots < 1 {
log.Warn("Sanitizing invalid txpool pool 3 slots", "provided", conf.Pool3Slots, "updated", DefaultConfig.Pool3Slots)
conf.Pool3Slots = DefaultConfig.Pool3Slots
}

if conf.Lifetime < 1 {
log.Warn("Sanitizing invalid txpool lifetime", "provided", conf.Lifetime, "updated", DefaultConfig.Lifetime)
conf.Lifetime = DefaultConfig.Lifetime
Expand Down Expand Up @@ -306,8 +301,6 @@ func New(config Config, chain BlockChain) *LegacyPool {
pool.journal = newTxJournal(config.Journal)
}

//pool.startPeriodicTransfer(config.InterPoolTransferTime)

return pool
}

Expand Down Expand Up @@ -544,6 +537,17 @@ func (pool *LegacyPool) Stats() (int, int) {
return pool.stats()
}

func (pool *LegacyPool) StatsPool3() int {
pool.mu.RLock()
defer pool.mu.RUnlock()

if pool.localBufferPool == nil {
return 0
}

return pool.localBufferPool.Size()
}

// stats retrieves the current pool stats, namely the number of pending and the
// number of queued (non-executable) transactions.
func (pool *LegacyPool) stats() (int, int) {
Expand Down Expand Up @@ -780,14 +784,6 @@ func (pool *LegacyPool) add(tx *types.Transaction, local bool) (replaced bool, e
maxPool1Size := pool.config.GlobalSlots + pool.config.GlobalQueue
maxPool2Size := pool.config.Pool2Slots
txPoolSizeAfterCurrentTx := uint64(pool.all.Slots() + numSlots(tx))
var includePool1, includePool2, includePool3 bool
if txPoolSizeAfterCurrentTx <= maxPool1Size {
includePool1 = true
} else if (txPoolSizeAfterCurrentTx > maxPool1Size) && (txPoolSizeAfterCurrentTx <= (maxPool1Size + maxPool2Size)) {
includePool2 = true
} else {
includePool3 = true
}

// Make the local flag. If it's from local source or it's from the network but
// the sender is marked as local previously, treat it as the local transaction.
Expand Down Expand Up @@ -829,15 +825,6 @@ func (pool *LegacyPool) add(tx *types.Transaction, local bool) (replaced bool, e
if txPoolSizeAfterCurrentTx > (maxPool1Size + maxPool2Size) {
// If the new transaction is underpriced, don't accept it
if !isLocal && pool.priced.Underpriced(tx) {
addedToAnyPool, err := pool.addToPool12OrPool3(tx, from, isLocal, includePool1, includePool2, includePool3)
if addedToAnyPool {
//return false, txpool.ErrUnderpricedTransferredtoAnotherPool // The reserve code expects named error formatting
return false, nil
}
if err != nil {
log.Error("Error while trying to add to pool2 or pool3", "error", err)
}

log.Trace("Discarding underpriced transaction", "hash", hash, "gasTipCap", tx.GasTipCap(), "gasFeeCap", tx.GasFeeCap())
underpricedTxMeter.Mark(1)
return false, txpool.ErrUnderpriced
Expand Down Expand Up @@ -885,22 +872,8 @@ func (pool *LegacyPool) add(tx *types.Transaction, local bool) (replaced bool, e
return false, txpool.ErrFutureReplacePending // todo 1 maybe in this case the future transaction can be part of pool3?
}
}

// calculate total number of slots in drop. Accordingly add them to pool3 (if there is space)
// all members of drop will be dropped from pool1/2 regardless of whether they get added to pool3 or not
availableSlotsPool3 := pool.availableSlotsPool3()
if availableSlotsPool3 > 0 {
// transfer availableSlotsPool3 number of transactions slots from drop to pool3
currentSlotsUsed := 0
for _, tx := range drop {
txSlots := numSlots(tx)
if currentSlotsUsed+txSlots <= availableSlotsPool3 {
from, _ := types.Sender(pool.signer, tx)
pool.addToPool12OrPool3(tx, from, isLocal, false, false, true)
currentSlotsUsed += txSlots
}
}
}

Check failure on line 875 in core/txpool/legacypool/legacypool.go

View workflow job for this annotation

GitHub Actions / golang-lint (1.21.x, ubuntu-latest)

File is not `goimports`-ed (goimports)
pool.addToPool3(drop, isLocal)

// Kick out the underpriced remote transactions.
for _, tx := range drop {
Expand All @@ -917,7 +890,7 @@ func (pool *LegacyPool) add(tx *types.Transaction, local bool) (replaced bool, e
// Try to replace an existing transaction in the pending pool
if list := pool.pending[from]; list != nil && list.Contains(tx.Nonce()) {
// Nonce already pending, check if required price bump is met
inserted, old := list.Add(tx, pool.config.PriceBump, includePool2)
inserted, old := list.Add(tx, pool.config.PriceBump, false)
if !inserted {
pendingDiscardMeter.Mark(1)
return false, txpool.ErrReplaceUnderpriced
Expand All @@ -940,7 +913,7 @@ func (pool *LegacyPool) add(tx *types.Transaction, local bool) (replaced bool, e
}

// New transaction isn't replacing a pending one, push into queue
replaced, err = pool.enqueueTx(hash, tx, isLocal, true, includePool2) // At this point pool1 can incorporate this. So no need for pool2 or pool3
replaced, err = pool.enqueueTx(hash, tx, isLocal, true, true) // At this point pool1 can incorporate this. So no need for pool2 or pool3
if err != nil {
return false, err
}
Expand All @@ -960,6 +933,24 @@ func (pool *LegacyPool) add(tx *types.Transaction, local bool) (replaced bool, e
return replaced, nil
}

func (pool *LegacyPool) addToPool3(drop types.Transactions, isLocal bool) {
// calculate total number of slots in drop. Accordingly add them to pool3 (if there is space)
// all members of drop will be dropped from pool1/2 regardless of whether they get added to pool3 or not
availableSlotsPool3 := pool.availableSlotsPool3()
if availableSlotsPool3 > 0 {
// transfer availableSlotsPool3 number of transactions slots from drop to pool3
currentSlotsUsed := 0
for _, tx := range drop {
txSlots := numSlots(tx)
if currentSlotsUsed+txSlots <= availableSlotsPool3 {
from, _ := types.Sender(pool.signer, tx)
pool.addToPool12OrPool3(tx, from, isLocal, false, false, true)
currentSlotsUsed += txSlots
}
}
}
}

// addToPool12OrPool3 adds a transaction to pool1 or pool2 or pool3 depending on which one is asked for
func (pool *LegacyPool) addToPool12OrPool3(tx *types.Transaction, from common.Address, isLocal bool, pool1, pool2, pool3 bool) (bool, error) {
if pool1 {
Expand Down Expand Up @@ -1486,8 +1477,8 @@ func (pool *LegacyPool) runReorg(done chan struct{}, reset *txpoolResetRequest,
}
}

// Transfer transactions from pool3 to pool2 for new block import
pool.transferTransactions()
//// Transfer transactions from pool3 to pool2 for new block import
//pool.transferTransactions()

// Check for pending transactions for every account that sent new ones
promoted := pool.promoteExecutables(promoteAddrs)
Expand Down Expand Up @@ -1521,6 +1512,9 @@ func (pool *LegacyPool) runReorg(done chan struct{}, reset *txpoolResetRequest,
pool.changesSinceReorg = 0 // Reset change counter
pool.mu.Unlock()

// Transfer transactions from pool3 to pool2 for new block import
pool.transferTransactions()

// Notify subsystems for newly added transactions
for _, tx := range promoted {
addr, _ := types.Sender(pool.signer, tx)
Expand All @@ -1542,11 +1536,13 @@ func (pool *LegacyPool) runReorg(done chan struct{}, reset *txpoolResetRequest,
}
// Send static transactions
if len(staticTxs) > 0 {
fmt.Println("New txevent emitted for static ", staticTxs[0].Hash())
pool.txFeed.Send(core.NewTxsEvent{Txs: staticTxs, Static: true})
}

// Send dynamic transactions
if len(nonStaticTxs) > 0 {
fmt.Println("New txevent emitted for non static ", nonStaticTxs[0].Hash())
pool.txFeed.Send(core.NewTxsEvent{Txs: nonStaticTxs, Static: false})
}
}
Expand Down Expand Up @@ -2255,7 +2251,8 @@ func (pool *LegacyPool) availableSlotsPool3() int {
func (pool *LegacyPool) printTxStats() {
for _, l := range pool.pending {
for _, transaction := range l.txs.items {
fmt.Println("Pending:", transaction.Hash().String(), transaction.GasFeeCap(), transaction.GasTipCap())
from, _ := types.Sender(pool.signer, transaction)
fmt.Println("from: ", from, " Pending:", transaction.Hash().String(), transaction.GasFeeCap(), transaction.GasTipCap())
}
}

Expand Down
Loading

0 comments on commit 774e314

Please sign in to comment.