Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

core/txpool/legacypool: add overflowpool for txs #2660

Merged
merged 59 commits into from
Oct 17, 2024
Merged
Show file tree
Hide file tree
Changes from 42 commits
Commits
Show all changes
59 commits
Select commit Hold shift + click to select a range
dfb046e
txpool: incomplete pool2 and pool3
emailtovamos Jul 16, 2024
e30248b
pool: optimise sending in reorg
emailtovamos Jul 29, 2024
f80ac01
pool: add static info and simplify transfer
emailtovamos Aug 1, 2024
8655d30
pool: remove comments, use queue not pool23
emailtovamos Aug 1, 2024
4538e92
pool: remove unused function
emailtovamos Aug 1, 2024
4369e3d
pool: refactor and bugfix
emailtovamos Aug 1, 2024
d0d6a27
pool: buffer test and size logic
emailtovamos Aug 2, 2024
5faf413
pool: add discarded ones to pool3 by default.
emailtovamos Aug 2, 2024
5bb78b3
pool: minor refactor
emailtovamos Aug 20, 2024
6b4e16b
pool: make slots config
emailtovamos Aug 21, 2024
d03f7e5
pool: initialise pool3 slots
emailtovamos Aug 21, 2024
94a60a9
pool: add underpriced to pool2 or 3
emailtovamos Aug 23, 2024
ed2d1d7
pool: enqueue in pool2 & drop properly
emailtovamos Aug 25, 2024
6daecfb
pool: bugfix:always drop drop and pool2 size
emailtovamos Aug 27, 2024
9d7298f
pool: TestDualHeapEviction passing partly
emailtovamos Aug 27, 2024
a1a25e9
pool: TestDualHeapEviction fully pass
emailtovamos Aug 27, 2024
253d9a5
pool: some cleanups
emailtovamos Aug 27, 2024
0692a99
pool: fix the TestTransactionFutureAttack test
emailtovamos Aug 28, 2024
40dcfcd
pool: cleanup debug logs
emailtovamos Aug 28, 2024
6673f3e
pool: fix TestUnderpricingDynamicFee based on new pool
emailtovamos Aug 28, 2024
ebd8f59
pool: fix all old tests
emailtovamos Aug 28, 2024
bdb4cc2
pool: lint
emailtovamos Aug 29, 2024
e45e7eb
pool: include static in flatten
emailtovamos Aug 29, 2024
069eaf2
pool: proper use of AsyncSendPooledTransactionHashes
emailtovamos Aug 29, 2024
70ece93
pool: flags for pool2 and 3 capacity
emailtovamos Aug 29, 2024
e7d0a16
pool: fix test as now by default pool2 and pool3 aren't empty
emailtovamos Aug 30, 2024
16a2a53
Merge remote-tracking branch 'origin/develop' into txpool-new
emailtovamos Sep 2, 2024
0f8a1b5
pool: test for transfer
emailtovamos Sep 3, 2024
76d157d
pool: set transfer time in config
emailtovamos Sep 3, 2024
aeec0c7
pool: remove unused criticalpathpool
emailtovamos Sep 3, 2024
53042e1
buffer: make private
emailtovamos Sep 3, 2024
8e6833c
pool: bug fix and test fix
emailtovamos Sep 3, 2024
5f398db
pool: pool2 can have 0 size
emailtovamos Sep 3, 2024
706a24e
pool: lint fix
emailtovamos Sep 4, 2024
0e61543
test: requestPromoteExecutables after every enqueue for testing
emailtovamos Sep 6, 2024
0a5dbef
pool: queued goes to 0 locally after this change
emailtovamos Sep 6, 2024
248bb6b
pool: fastcache, interface, metrics modify
emailtovamos Sep 11, 2024
cf10c5c
eth: send to some peers of pool2, not just static
emailtovamos Sep 11, 2024
b818cb7
pool: transfer on block import and simplify it
emailtovamos Sep 12, 2024
774e314
pool: truly discard underpriced, Transfer after lock is over
emailtovamos Sep 18, 2024
629af6d
pool: else ifs instead of ifs
emailtovamos Sep 18, 2024
3e3c56b
pool: address minor issues
emailtovamos Sep 19, 2024
0957562
pool: heap map as pool3
emailtovamos Sep 25, 2024
846e55b
pool: remove pool2 from legacypool
emailtovamos Sep 25, 2024
6a6e09c
pool: remove pool2 related code
emailtovamos Sep 26, 2024
355dee9
pool: edit tests and remove remaining pool2 logic
emailtovamos Sep 26, 2024
1ad40cd
pool: add back removed logic
emailtovamos Sep 26, 2024
d5b10e0
pool: remove fastcache which is no longer required
emailtovamos Sep 26, 2024
4e69ac4
pool: remove event and debug logs
emailtovamos Sep 26, 2024
31c9465
pool: remove old buffer for pool3
emailtovamos Sep 26, 2024
a8959fe
pool: minor refactors
emailtovamos Sep 26, 2024
9c72c02
pool: preallocate pool3
emailtovamos Sep 26, 2024
5d44ba9
pool: remove sequence, more sturdy, maxsize
emailtovamos Oct 1, 2024
7a929d6
pool: refactor to overflow pool
emailtovamos Oct 14, 2024
8170d99
Merge branch 'develop' into txpool-new
emailtovamos Oct 14, 2024
0e67514
pool: remove debug logs
emailtovamos Oct 14, 2024
f41bb13
pool: refactoring, addressing comments
emailtovamos Oct 15, 2024
0dd0bd7
pool: remove extra new lines
emailtovamos Oct 15, 2024
60bdc25
pool: fail fast, disable by default, no interface
emailtovamos Oct 16, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions cmd/geth/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,8 @@ var (
utils.TxPoolGlobalSlotsFlag,
utils.TxPoolAccountQueueFlag,
utils.TxPoolGlobalQueueFlag,
utils.TxPoolPool2SlotsFlag,
emailtovamos marked this conversation as resolved.
Show resolved Hide resolved
utils.TxPoolPool3SlotsFlag,
emailtovamos marked this conversation as resolved.
Show resolved Hide resolved
utils.TxPoolLifetimeFlag,
utils.TxPoolReannounceTimeFlag,
utils.BlobPoolDataDirFlag,
Expand Down
20 changes: 20 additions & 0 deletions cmd/utils/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -450,6 +450,18 @@ var (
Value: ethconfig.Defaults.TxPool.GlobalQueue,
Category: flags.TxPoolCategory,
}
TxPoolPool2SlotsFlag = &cli.Uint64Flag{
Name: "txpool.pool2slots",
Usage: "Maximum number of transaction slots in pool 2",
Value: ethconfig.Defaults.TxPool.Pool2Slots,
Category: flags.TxPoolCategory,
}
TxPoolPool3SlotsFlag = &cli.Uint64Flag{
Name: "txpool.pool3slots",
Usage: "Maximum number of transaction slots in pool 3",
Value: ethconfig.Defaults.TxPool.Pool3Slots,
Category: flags.TxPoolCategory,
}
TxPoolLifetimeFlag = &cli.DurationFlag{
Name: "txpool.lifetime",
Usage: "Maximum amount of time non-executable transaction are queued",
Expand Down Expand Up @@ -1762,6 +1774,12 @@ func setTxPool(ctx *cli.Context, cfg *legacypool.Config) {
if ctx.IsSet(TxPoolGlobalQueueFlag.Name) {
cfg.GlobalQueue = ctx.Uint64(TxPoolGlobalQueueFlag.Name)
}
if ctx.IsSet(TxPoolPool2SlotsFlag.Name) {
cfg.Pool2Slots = ctx.Uint64(TxPoolPool2SlotsFlag.Name)
}
if ctx.IsSet(TxPoolPool3SlotsFlag.Name) {
cfg.Pool3Slots = ctx.Uint64(TxPoolPool3SlotsFlag.Name)
}
if ctx.IsSet(TxPoolLifetimeFlag.Name) {
cfg.Lifetime = ctx.Duration(TxPoolLifetimeFlag.Name)
}
Expand Down Expand Up @@ -2292,6 +2310,8 @@ func EnableNodeInfo(poolConfig *legacypool.Config, nodeInfo *p2p.NodeInfo) Setup
"GlobalSlots": poolConfig.GlobalSlots,
"AccountQueue": poolConfig.AccountQueue,
"GlobalQueue": poolConfig.GlobalQueue,
"Pool2Slots": poolConfig.Pool2Slots,
"Pool3Slots": poolConfig.Pool3Slots,
"Lifetime": poolConfig.Lifetime,
})
}
Expand Down
8 changes: 7 additions & 1 deletion core/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,13 @@ import (
)

// NewTxsEvent is posted when a batch of transactions enters the transaction pool.
type NewTxsEvent struct{ Txs []*types.Transaction }
type NewTxsEvent struct {
Txs []*types.Transaction
// Static bool is Whether to send to only Static peer or not.
// This is because at high traffic we still want to broadcast transactions to at least some peers so that we
// minimize the transaction lost.
Static bool
zzzckck marked this conversation as resolved.
Show resolved Hide resolved
}

// ReannoTxsEvent is posted when a batch of local pending transactions exceed a specified duration.
type ReannoTxsEvent struct{ Txs []*types.Transaction }
Expand Down
4 changes: 4 additions & 0 deletions core/txpool/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,10 @@ var (
// configured for the transaction pool.
ErrUnderpriced = errors.New("transaction underpriced")

// ErrUnderpricedTransferredtoAnotherPool is returned if a transaction's gas price is below the minimum
// configured for the transaction pool.
ErrUnderpricedTransferredtoAnotherPool = errors.New("transaction underpriced, so it is either in pool2 or pool3")
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why need this error? I think underpriced transaction can be simply discarded


// ErrReplaceUnderpriced is returned if a transaction is attempted to be replaced
// with a different one without the required price bump.
ErrReplaceUnderpriced = errors.New("replacement transaction underpriced")
Expand Down
115 changes: 115 additions & 0 deletions core/txpool/legacypool/buffer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
package legacypool

import (
containerList "container/list"
"fmt"
"sync"

"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
)

type LRUBuffer struct {
Copy link
Collaborator

@zzzckck zzzckck Sep 5, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggestion-2:
maybe we can just use the fastcache instead? it is a RingBuffer, not LRU, RingBuffer has some advantages, you may refer: https://github.com/bnb-chain/bsc/blob/master/core/state/snapshot/disklayer.go#L36

Fastcache may not work directly, as it does not support iteration. We can add a list of TxHash to support the iteration.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I also suggest here to use well tested lib, like github.com/ethereum/go-ethereum/common/lru, github.com/VictoriaMetrics/fastcache, etc. They have less bug and better performance.

capacity int
buffer *containerList.List
index map[common.Hash]*containerList.Element
mu sync.Mutex
size int // Total number of slots used
}

func NewLRUBuffer(capacity int) *LRUBuffer {
return &LRUBuffer{
capacity: capacity,
buffer: containerList.New(),
index: make(map[common.Hash]*containerList.Element),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe we should pre-allocate this to max size or some lvl e.g. 1/2 of capacity

size: 0, // Initialize size to 0
}
}

func (lru *LRUBuffer) Add(tx *types.Transaction) {
lru.mu.Lock()
defer lru.mu.Unlock()

if elem, ok := lru.index[tx.Hash()]; ok {
lru.buffer.MoveToFront(elem)
return
}

txSlots := numSlots(tx)

// Remove elements until there is enough capacity
for lru.size+txSlots > lru.capacity && lru.buffer.Len() > 0 {
zzzckck marked this conversation as resolved.
Show resolved Hide resolved
back := lru.buffer.Back()
removedTx := back.Value.(*types.Transaction)
lru.buffer.Remove(back)
delete(lru.index, removedTx.Hash())
lru.size -= numSlots(removedTx) // Decrease size by the slots of the removed transaction
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what if deleting one is not enough because of # of slots?

}

elem := lru.buffer.PushFront(tx)
lru.index[tx.Hash()] = elem
lru.size += txSlots // Increase size by the slots of the new transaction
// Update pool3Gauge
pool3Gauge.Inc(1)
}

func (lru *LRUBuffer) Get(hash common.Hash) (*types.Transaction, bool) {
lru.mu.Lock()
defer lru.mu.Unlock()

if elem, ok := lru.index[hash]; ok {
lru.buffer.MoveToFront(elem)
return elem.Value.(*types.Transaction), true
}
return nil, false
}

func (lru *LRUBuffer) Flush(maxTransactions int) []*types.Transaction {
lru.mu.Lock()
defer lru.mu.Unlock()

txs := make([]*types.Transaction, 0, maxTransactions)
count := 0
for count < maxTransactions && lru.buffer.Len() > 0 {
back := lru.buffer.Back()
removedTx := back.Value.(*types.Transaction)
txs = append(txs, removedTx)
lru.buffer.Remove(back)
delete(lru.index, removedTx.Hash())
lru.size -= numSlots(removedTx) // Decrease size by the slots of the removed transaction
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is the same as in if before, maybe it would be nicer to have it under one private function

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is the same as in if before, maybe it would be nicer to have it under one private function

+1

count++
// Update pool3Gauge
pool3Gauge.Dec(1)
}
return txs
}

// New method to get the current size of the buffer in terms of slots
func (lru *LRUBuffer) Size() int {
lru.mu.Lock()
defer lru.mu.Unlock()
return lru.size
}

// New iterator method to iterate over all transactions, ONLY used for printing and debugging
func (lru *LRUBuffer) iterate() <-chan *types.Transaction {
ch := make(chan *types.Transaction)
go func() {
lru.mu.Lock()
defer lru.mu.Unlock()
defer close(ch)

for e := lru.buffer.Front(); e != nil; e = e.Next() {
ch <- e.Value.(*types.Transaction)
}
}()
return ch
}

func (lru *LRUBuffer) PrintTxStats() {
// Iterating over the transactions
for tx := range lru.iterate() {
// Print transaction details or process them as needed
fmt.Println(tx.Hash().String(), tx.GasFeeCap().String(), tx.GasTipCap().String())
}
}
100 changes: 100 additions & 0 deletions core/txpool/legacypool/buffer_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
package legacypool

import (
"testing"

"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
"github.com/stretchr/testify/require"
)

// Helper function to create a dummy transaction of specified size
func createDummyTransaction(size int) *types.Transaction {
data := make([]byte, size)
return types.NewTransaction(0, common.Address{}, nil, 0, nil, data)
}

func TestNewLRUBuffer(t *testing.T) {
capacity := 10
lru := NewLRUBuffer(capacity)

require.Equal(t, capacity, lru.capacity, "expected capacity to match")
require.Zero(t, lru.buffer.Len(), "expected buffer length to be zero")
require.Zero(t, len(lru.index), "expected index length to be zero")
require.Zero(t, lru.size, "expected size to be zero")
}

func TestAddAndGet(t *testing.T) {
lru := NewLRUBuffer(10)

tx1 := createDummyTransaction(500)
tx2 := createDummyTransaction(1500)

lru.Add(tx1)
lru.Add(tx2)

require.Equal(t, 2, lru.Size(), "expected size to be 2")

retrievedTx, ok := lru.Get(tx1.Hash())
require.True(t, ok, "expected to retrieve tx1")
require.Equal(t, tx1.Hash(), retrievedTx.Hash(), "retrieved tx1 hash does not match")

retrievedTx, ok = lru.Get(tx2.Hash())
require.True(t, ok, "expected to retrieve tx2")
require.Equal(t, tx2.Hash(), retrievedTx.Hash(), "retrieved tx2 hash does not match")
}

func TestBufferCapacity(t *testing.T) {
lru := NewLRUBuffer(2) // Capacity in slots

tx1 := createDummyTransaction(500) // 1 slot
tx2 := createDummyTransaction(1500) // 1 slot
tx3 := createDummyTransaction(1000) // 1 slot

lru.Add(tx1)
lru.Add(tx2)

require.Equal(t, 2, lru.Size(), "expected size to be 2")

lru.Add(tx3)

require.Equal(t, 2, lru.Size(), "expected size to remain 2 after adding tx3")
_, ok := lru.Get(tx1.Hash())
require.False(t, ok, "expected tx1 to be evicted")
}

func TestFlush(t *testing.T) {
lru := NewLRUBuffer(10)

tx1 := createDummyTransaction(500)
tx2 := createDummyTransaction(1500)
tx3 := createDummyTransaction(1000)

lru.Add(tx1)
lru.Add(tx2)
lru.Add(tx3)

flushedTxs := lru.Flush(2)

require.Len(t, flushedTxs, 2, "expected to flush 2 transactions")

expectedSize := 1
actualSize := lru.Size()
require.Equal(t, expectedSize, actualSize, "expected size after flush to match")
}

func TestSize(t *testing.T) {
lru := NewLRUBuffer(10)

tx1 := createDummyTransaction(500) // 1 slot
tx2 := createDummyTransaction(1500) // 2 slots

lru.Add(tx1)
require.Equal(t, 1, lru.Size(), "expected size to be 1")

lru.Add(tx2)
require.Equal(t, 2, lru.Size(), "expected size to be 2")

lru.Flush(1)
require.Equal(t, 1, lru.Size(), "expected size to be 1 after flush")
}
Loading
Loading