Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

txpool: buffer so that we dont delete txs #2500

Draft
wants to merge 4 commits into
base: develop
Choose a base branch
from
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
81 changes: 78 additions & 3 deletions core/txpool/legacypool/legacypool.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,8 @@ const (

// txReannoMaxNum is the maximum number of transactions a reannounce action can include.
txReannoMaxNum = 1024

maxBufferSize = 1000 // maximum size of tx buffer
)

var (
Expand Down Expand Up @@ -244,6 +246,10 @@ type LegacyPool struct {
initDoneCh chan struct{} // is closed once the pool is initialized (for tests)

changesSinceReorg int // A counter for how many drops we've performed in-between reorg.

// A buffer to store transactions that would otherwise be discarded
buffer []*types.Transaction
bufferLock sync.Mutex
}

type txpoolResetRequest struct {
Expand Down Expand Up @@ -355,11 +361,13 @@ func (pool *LegacyPool) loop() {
evict = time.NewTicker(evictionInterval)
reannounce = time.NewTicker(reannounceInterval)
journal = time.NewTicker(pool.config.Rejournal)
readd = time.NewTicker(time.Minute) // ticker to re-add buffered transactions periodically
)
defer report.Stop()
defer evict.Stop()
defer reannounce.Stop()
defer journal.Stop()
defer readd.Stop() // Stop the ticker when the loop exits

// Notify tests that the init phase is done
close(pool.initDoneCh)
Expand Down Expand Up @@ -436,6 +444,9 @@ func (pool *LegacyPool) loop() {
}
pool.mu.Unlock()
}
// Handle re-adding buffered transactions
case <-readd.C:
pool.readdBufferedTransactions()
}
}
}
Expand Down Expand Up @@ -781,12 +792,21 @@ func (pool *LegacyPool) add(tx *types.Transaction, local bool) (replaced bool, e
}
}()
}
// If the transaction pool is full, discard underpriced transactions
// If the transaction pool is full, buffer underpriced transactions
if uint64(pool.all.Slots()+numSlots(tx)) > pool.config.GlobalSlots+pool.config.GlobalQueue {
// If the new transaction is underpriced, don't accept it
// If the new transaction is underpriced, buffer it
if !isLocal && pool.priced.Underpriced(tx) {
log.Trace("Discarding underpriced transaction", "hash", hash, "gasTipCap", tx.GasTipCap(), "gasFeeCap", tx.GasFeeCap())
log.Trace("Buffering underpriced transaction", "hash", hash, "gasTipCap", tx.GasTipCap(), "gasFeeCap", tx.GasFeeCap())
underpricedTxMeter.Mark(1)

pool.bufferLock.Lock()
if len(pool.buffer) < maxBufferSize {
pool.buffer = append(pool.buffer, tx)
} else {
log.Warn("Buffer is full, discarding transaction", "hash", hash)
}
pool.bufferLock.Unlock()

return false, txpool.ErrUnderpriced
}

Expand All @@ -804,6 +824,16 @@ func (pool *LegacyPool) add(tx *types.Transaction, local bool) (replaced bool, e
// Otherwise if we can't make enough room for new one, abort the operation.
drop, success := pool.priced.Discard(pool.all.Slots()-int(pool.config.GlobalSlots+pool.config.GlobalQueue)+numSlots(tx), isLocal)

// Add dropped transactions to the buffer
pool.bufferLock.Lock()
availableSpace := maxBufferSize - len(pool.buffer)
// Determine how many elements to take from drop
if availableSpace > len(drop) {
availableSpace = len(drop)
}
pool.buffer = append(pool.buffer, drop[:availableSpace]...)
pool.bufferLock.Unlock()

// Special case, we still can't make the room for the new remote one.
if !isLocal && !success {
log.Trace("Discarding overflown transaction", "hash", hash)
Expand Down Expand Up @@ -1779,6 +1809,51 @@ func (pool *LegacyPool) SetMaxGas(maxGas uint64) {
pool.maxGas.Store(maxGas)
}

func (pool *LegacyPool) readdBufferedTransactions() {
pool.mu.Lock()
defer pool.mu.Unlock()

// Check if there is space in the pool
if uint64(pool.all.Slots()) >= pool.config.GlobalSlots+pool.config.GlobalQueue {
return // No space available, skip re-adding
}

var readded []*types.Transaction

pool.bufferLock.Lock()
for _, tx := range pool.buffer {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you do not lock buffermutex here and you should

// Check if adding this transaction will exceed the pool capacity
if uint64(pool.all.Slots()+numSlots(tx)) > pool.config.GlobalSlots+pool.config.GlobalQueue {
break // Stop if adding the transaction will exceed the pool capacity
}

if _, err := pool.add(tx, false); err == nil {
readded = append(readded, tx)
}
}
pool.bufferLock.Unlock()

// Remove successfully re-added transactions from the buffer
if len(readded) > 0 {
remaining := pool.buffer[:0]
for _, tx := range pool.buffer {
if !containsTransaction(readded, tx) {
remaining = append(remaining, tx)
}
}
pool.buffer = remaining
}
}

func containsTransaction(txs []*types.Transaction, tx *types.Transaction) bool {
for _, t := range txs {
if t.Hash() == tx.Hash() {
return true
}
}
return false
}

// addressByHeartbeat is an account address tagged with its last activity timestamp.
type addressByHeartbeat struct {
address common.Address
Expand Down
Loading