Skip to content

Commit

Permalink
Merge pull request #2173 from OffchainLabs/batchposter-l1price-metrics
Browse files Browse the repository at this point in the history
Add metrics to track L1 price in batch poster [NIT-1248]
  • Loading branch information
joshuacolvin0 authored Mar 28, 2024
2 parents ff9e72a + 066b9fa commit d28682b
Show file tree
Hide file tree
Showing 9 changed files with 416 additions and 43 deletions.
66 changes: 64 additions & 2 deletions arbnode/batch_poster.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,18 @@ import (
)

var (
batchPosterWalletBalance = metrics.NewRegisteredGaugeFloat64("arb/batchposter/wallet/balanceether", nil)
batchPosterGasRefunderBalance = metrics.NewRegisteredGaugeFloat64("arb/batchposter/gasrefunder/balanceether", nil)
batchPosterWalletBalance = metrics.NewRegisteredGaugeFloat64("arb/batchposter/wallet/eth", nil)
batchPosterGasRefunderBalance = metrics.NewRegisteredGaugeFloat64("arb/batchposter/gasrefunder/eth", nil)
baseFeeGauge = metrics.NewRegisteredGauge("arb/batchposter/basefee", nil)
blobFeeGauge = metrics.NewRegisteredGauge("arb/batchposter/blobfee", nil)
l1GasPriceGauge = metrics.NewRegisteredGauge("arb/batchposter/l1gasprice", nil)
l1GasPriceEstimateGauge = metrics.NewRegisteredGauge("arb/batchposter/l1gasprice/estimate", nil)
latestBatchSurplusGauge = metrics.NewRegisteredGauge("arb/batchposter/latestbatchsurplus", nil)
blockGasUsedGauge = metrics.NewRegisteredGauge("arb/batchposter/blockgas/used", nil)
blockGasLimitGauge = metrics.NewRegisteredGauge("arb/batchposter/blockgas/limit", nil)
blobGasUsedGauge = metrics.NewRegisteredGauge("arb/batchposter/blobgas/used", nil)
blobGasLimitGauge = metrics.NewRegisteredGauge("arb/batchposter/blobgas/limit", nil)
suggestedTipCapGauge = metrics.NewRegisteredGauge("arb/batchposter/suggestedtipcap", nil)

usableBytesInBlob = big.NewInt(int64(len(kzg4844.Blob{}) * 31 / 32))
blobTxBlobGasPerBlob = big.NewInt(params.BlobTxBlobGasPerBlob)
Expand Down Expand Up @@ -510,6 +520,49 @@ func (b *BatchPoster) checkReverts(ctx context.Context, to int64) (bool, error)
return false, nil
}

func (b *BatchPoster) pollForL1PriceData(ctx context.Context) {
headerCh, unsubscribe := b.l1Reader.Subscribe(false)
defer unsubscribe()

blobGasLimitGauge.Update(params.MaxBlobGasPerBlock)
for {
select {
case h, ok := <-headerCh:
if !ok {
log.Info("L1 headers channel checking for l1 price data has been closed")
return
}
baseFeeGauge.Update(h.BaseFee.Int64())
l1GasPrice := h.BaseFee.Uint64()
if h.BlobGasUsed != nil {
if h.ExcessBlobGas != nil {
blobFeePerByte := eip4844.CalcBlobFee(eip4844.CalcExcessBlobGas(*h.ExcessBlobGas, *h.BlobGasUsed))
blobFeePerByte.Mul(blobFeePerByte, blobTxBlobGasPerBlob)
blobFeePerByte.Div(blobFeePerByte, usableBytesInBlob)
blobFeeGauge.Update(blobFeePerByte.Int64())
if l1GasPrice > blobFeePerByte.Uint64()/16 {
l1GasPrice = blobFeePerByte.Uint64() / 16
}
}
blobGasUsedGauge.Update(int64(*h.BlobGasUsed))
}
blockGasUsedGauge.Update(int64(h.GasUsed))
blockGasLimitGauge.Update(int64(h.GasLimit))
suggestedTipCap, err := b.l1Reader.Client().SuggestGasTipCap(ctx)
if err != nil {
log.Error("unable to fetch suggestedTipCap from l1 client to update arb/batchposter/suggestedtipcap metric", "err", err)
} else {
suggestedTipCapGauge.Update(suggestedTipCap.Int64())
}
l1GasPriceEstimate := b.streamer.CurrentEstimateOfL1GasPrice()
l1GasPriceGauge.Update(int64(l1GasPrice))
l1GasPriceEstimateGauge.Update(int64(l1GasPriceEstimate))
case <-ctx.Done():
return
}
}
}

// pollForReverts runs a gouroutine that listens to l1 block headers, checks
// if any transaction made by batch poster was reverted.
func (b *BatchPoster) pollForReverts(ctx context.Context) {
Expand Down Expand Up @@ -1272,6 +1325,14 @@ func (b *BatchPoster) maybePostSequencerBatch(ctx context.Context) (bool, error)
"numBlobs", len(kzgBlobs),
)

surplus := arbmath.SaturatingMul(
arbmath.SaturatingSub(
l1GasPriceGauge.Snapshot().Value(),
l1GasPriceEstimateGauge.Snapshot().Value()),
int64(len(sequencerMsg)*16),
)
latestBatchSurplusGauge.Update(surplus)

recentlyHitL1Bounds := time.Since(b.lastHitL1Bounds) < config.PollInterval*3
postedMessages := b.building.msgCount - batchPosition.MessageCount
b.messagesPerBatch.Update(uint64(postedMessages))
Expand Down Expand Up @@ -1341,6 +1402,7 @@ func (b *BatchPoster) Start(ctxIn context.Context) {
b.redisLock.Start(ctxIn)
b.StopWaiter.Start(ctxIn, b)
b.LaunchThread(b.pollForReverts)
b.LaunchThread(b.pollForL1PriceData)
commonEphemeralErrorHandler := util.NewEphemeralErrorHandler(time.Minute, "", 0)
exceedMaxMempoolSizeEphemeralErrorHandler := util.NewEphemeralErrorHandler(5*time.Minute, dataposter.ErrExceedsMaxMempoolSize.Error(), time.Minute)
storageRaceEphemeralErrorHandler := util.NewEphemeralErrorHandler(5*time.Minute, storage.ErrStorageRace.Error(), time.Minute)
Expand Down
11 changes: 11 additions & 0 deletions arbnode/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -756,6 +756,17 @@ func CreateNode(
return currentNode, nil
}

func (n *Node) CacheL1PriceDataOfMsg(pos arbutil.MessageIndex, callDataUnits uint64, l1GasCharged uint64) {
n.TxStreamer.CacheL1PriceDataOfMsg(pos, callDataUnits, l1GasCharged)
}

func (n *Node) BacklogL1GasCharged() uint64 {
return n.TxStreamer.BacklogL1GasCharged()
}
func (n *Node) BacklogCallDataUnits() uint64 {
return n.TxStreamer.BacklogCallDataUnits()
}

func (n *Node) Start(ctx context.Context) error {
execClient, ok := n.Execution.(*gethexec.ExecutionNode)
if !ok {
Expand Down
122 changes: 122 additions & 0 deletions arbnode/transaction_streamer.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,9 @@ type TransactionStreamer struct {
broadcastServer *broadcaster.Broadcaster
inboxReader *InboxReader
delayedBridge *DelayedBridge

cachedL1PriceDataMutex sync.RWMutex
cachedL1PriceData *L1PriceData
}

type TransactionStreamerConfig struct {
Expand Down Expand Up @@ -112,6 +115,9 @@ func NewTransactionStreamer(
broadcastServer: broadcastServer,
fatalErrChan: fatalErrChan,
config: config,
cachedL1PriceData: &L1PriceData{
msgToL1PriceData: []L1PriceDataOfMsg{},
},
}
err := streamer.cleanupInconsistentState()
if err != nil {
Expand All @@ -120,6 +126,120 @@ func NewTransactionStreamer(
return streamer, nil
}

type L1PriceDataOfMsg struct {
callDataUnits uint64
cummulativeCallDataUnits uint64
l1GasCharged uint64
cummulativeL1GasCharged uint64
}

type L1PriceData struct {
startOfL1PriceDataCache arbutil.MessageIndex
endOfL1PriceDataCache arbutil.MessageIndex
msgToL1PriceData []L1PriceDataOfMsg
currentEstimateOfL1GasPrice uint64
}

func (s *TransactionStreamer) CurrentEstimateOfL1GasPrice() uint64 {
s.cachedL1PriceDataMutex.Lock()
defer s.cachedL1PriceDataMutex.Unlock()

currentEstimate, err := s.exec.GetL1GasPriceEstimate()
if err != nil {
log.Error("error fetching current L2 estimate of L1 gas price hence reusing cached estimate", "err", err)
} else {
s.cachedL1PriceData.currentEstimateOfL1GasPrice = currentEstimate
}
return s.cachedL1PriceData.currentEstimateOfL1GasPrice
}

func (s *TransactionStreamer) BacklogCallDataUnits() uint64 {
s.cachedL1PriceDataMutex.RLock()
defer s.cachedL1PriceDataMutex.RUnlock()

size := len(s.cachedL1PriceData.msgToL1PriceData)
if size == 0 {
return 0
}
return (s.cachedL1PriceData.msgToL1PriceData[size-1].cummulativeCallDataUnits -
s.cachedL1PriceData.msgToL1PriceData[0].cummulativeCallDataUnits +
s.cachedL1PriceData.msgToL1PriceData[0].callDataUnits)
}

func (s *TransactionStreamer) BacklogL1GasCharged() uint64 {
s.cachedL1PriceDataMutex.RLock()
defer s.cachedL1PriceDataMutex.RUnlock()

size := len(s.cachedL1PriceData.msgToL1PriceData)
if size == 0 {
return 0
}
return (s.cachedL1PriceData.msgToL1PriceData[size-1].cummulativeL1GasCharged -
s.cachedL1PriceData.msgToL1PriceData[0].cummulativeL1GasCharged +
s.cachedL1PriceData.msgToL1PriceData[0].l1GasCharged)
}

func (s *TransactionStreamer) TrimCache(to arbutil.MessageIndex) {
s.cachedL1PriceDataMutex.Lock()
defer s.cachedL1PriceDataMutex.Unlock()

if to < s.cachedL1PriceData.startOfL1PriceDataCache {
log.Info("trying to trim older cache which doesnt exist anymore")
} else if to >= s.cachedL1PriceData.endOfL1PriceDataCache {
s.cachedL1PriceData.startOfL1PriceDataCache = 0
s.cachedL1PriceData.endOfL1PriceDataCache = 0
s.cachedL1PriceData.msgToL1PriceData = []L1PriceDataOfMsg{}
} else {
newStart := to - s.cachedL1PriceData.startOfL1PriceDataCache + 1
s.cachedL1PriceData.msgToL1PriceData = s.cachedL1PriceData.msgToL1PriceData[newStart:]
s.cachedL1PriceData.startOfL1PriceDataCache = to + 1
}
}

func (s *TransactionStreamer) CacheL1PriceDataOfMsg(seqNum arbutil.MessageIndex, callDataUnits uint64, l1GasCharged uint64) {
s.cachedL1PriceDataMutex.Lock()
defer s.cachedL1PriceDataMutex.Unlock()

resetCache := func() {
s.cachedL1PriceData.startOfL1PriceDataCache = seqNum
s.cachedL1PriceData.endOfL1PriceDataCache = seqNum
s.cachedL1PriceData.msgToL1PriceData = []L1PriceDataOfMsg{{
callDataUnits: callDataUnits,
cummulativeCallDataUnits: callDataUnits,
l1GasCharged: l1GasCharged,
cummulativeL1GasCharged: l1GasCharged,
}}
}
size := len(s.cachedL1PriceData.msgToL1PriceData)
if size == 0 ||
s.cachedL1PriceData.startOfL1PriceDataCache == 0 ||
s.cachedL1PriceData.endOfL1PriceDataCache == 0 ||
arbutil.MessageIndex(size) != s.cachedL1PriceData.endOfL1PriceDataCache-s.cachedL1PriceData.startOfL1PriceDataCache+1 {
resetCache()
return
}
if seqNum != s.cachedL1PriceData.endOfL1PriceDataCache+1 {
if seqNum > s.cachedL1PriceData.endOfL1PriceDataCache+1 {
log.Info("message position higher then current end of l1 price data cache, resetting cache to this message")
resetCache()
} else if seqNum < s.cachedL1PriceData.startOfL1PriceDataCache {
log.Info("message position lower than start of l1 price data cache, ignoring")
} else {
log.Info("message position already seen in l1 price data cache, ignoring")
}
} else {
cummulativeCallDataUnits := s.cachedL1PriceData.msgToL1PriceData[size-1].cummulativeCallDataUnits
cummulativeL1GasCharged := s.cachedL1PriceData.msgToL1PriceData[size-1].cummulativeL1GasCharged
s.cachedL1PriceData.msgToL1PriceData = append(s.cachedL1PriceData.msgToL1PriceData, L1PriceDataOfMsg{
callDataUnits: callDataUnits,
cummulativeCallDataUnits: cummulativeCallDataUnits + callDataUnits,
l1GasCharged: l1GasCharged,
cummulativeL1GasCharged: cummulativeL1GasCharged + l1GasCharged,
})
s.cachedL1PriceData.endOfL1PriceDataCache = seqNum
}
}

// Encodes a uint64 as bytes in a lexically sortable manner for database iteration.
// Generally this is only used for database keys, which need sorted.
// A shorter RLP encoding is usually used for database values.
Expand Down Expand Up @@ -577,6 +697,8 @@ func endBatch(batch ethdb.Batch) error {

func (s *TransactionStreamer) AddMessagesAndEndBatch(pos arbutil.MessageIndex, messagesAreConfirmed bool, messages []arbostypes.MessageWithMetadata, batch ethdb.Batch) error {
if messagesAreConfirmed {
// Trim confirmed messages from l1pricedataCache
s.TrimCache(pos + arbutil.MessageIndex(len(messages)))
s.reorgMutex.RLock()
dups, _, _, err := s.countDuplicateMessages(pos, messages, nil)
s.reorgMutex.RUnlock()
Expand Down
17 changes: 17 additions & 0 deletions arbos/l1pricing/l1pricing.go
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,23 @@ func (ps *L1PricingState) SetUnitsSinceUpdate(units uint64) error {
return ps.unitsSinceUpdate.Set(units)
}

func (ps *L1PricingState) GetL1PricingSurplus() (*big.Int, error) {
fundsDueForRefunds, err := ps.BatchPosterTable().TotalFundsDue()
if err != nil {
return nil, err
}
fundsDueForRewards, err := ps.FundsDueForRewards()
if err != nil {
return nil, err
}
haveFunds, err := ps.L1FeesAvailable()
if err != nil {
return nil, err
}
needFunds := arbmath.BigAdd(fundsDueForRefunds, fundsDueForRewards)
return arbmath.BigSub(haveFunds, needFunds), nil
}

func (ps *L1PricingState) LastSurplus() (*big.Int, error) {
return ps.lastSurplus.Get()
}
Expand Down
51 changes: 51 additions & 0 deletions execution/gethexec/executionengine.go
Original file line number Diff line number Diff line change
Expand Up @@ -378,6 +378,8 @@ func (s *ExecutionEngine) sequenceTransactionsWithBlockMutex(header *arbostypes.
return nil, err
}

s.cacheL1PriceDataOfMsg(pos, receipts, block)

return block, nil
}

Expand Down Expand Up @@ -529,6 +531,55 @@ func (s *ExecutionEngine) ResultAtPos(pos arbutil.MessageIndex) (*execution.Mess
return s.resultFromHeader(s.bc.GetHeaderByNumber(s.MessageIndexToBlockNumber(pos)))
}

func (s *ExecutionEngine) GetL1GasPriceEstimate() (uint64, error) {
bc := s.bc
latestHeader := bc.CurrentBlock()
latestState, err := bc.StateAt(latestHeader.Root)
if err != nil {
return 0, errors.New("error getting latest statedb while fetching l2 Estimate of L1 GasPrice")
}
arbState, err := arbosState.OpenSystemArbosState(latestState, nil, true)
if err != nil {
return 0, errors.New("error opening system arbos state while fetching l2 Estimate of L1 GasPrice")
}
l2EstimateL1GasPrice, err := arbState.L1PricingState().PricePerUnit()
if err != nil {
return 0, errors.New("error fetching l2 Estimate of L1 GasPrice")
}
return l2EstimateL1GasPrice.Uint64(), nil
}

func (s *ExecutionEngine) getL1PricingSurplus() (int64, error) {
bc := s.bc
latestHeader := bc.CurrentBlock()
latestState, err := bc.StateAt(latestHeader.Root)
if err != nil {
return 0, errors.New("error getting latest statedb while fetching current L1 pricing surplus")
}
arbState, err := arbosState.OpenSystemArbosState(latestState, nil, true)
if err != nil {
return 0, errors.New("error opening system arbos state while fetching current L1 pricing surplus")
}
surplus, err := arbState.L1PricingState().GetL1PricingSurplus()
if err != nil {
return 0, errors.New("error fetching current L1 pricing surplus")
}
return surplus.Int64(), nil
}

func (s *ExecutionEngine) cacheL1PriceDataOfMsg(num arbutil.MessageIndex, receipts types.Receipts, block *types.Block) {
var gasUsedForL1 uint64
for i := 1; i < len(receipts); i++ {
gasUsedForL1 += receipts[i].GasUsedForL1
}
gasChargedForL1 := gasUsedForL1 * block.BaseFee().Uint64()
var callDataUnits uint64
for _, tx := range block.Transactions() {
callDataUnits += tx.CalldataUnits
}
s.consensus.CacheL1PriceDataOfMsg(num, callDataUnits, gasChargedForL1)
}

// DigestMessage is used to create a block by executing msg against the latest state and storing it.
// Also, while creating a block by executing msg against the latest state,
// in parallel, creates a block by executing msgForPrefetch (msg+1) against the latest state
Expand Down
4 changes: 4 additions & 0 deletions execution/gethexec/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -275,6 +275,10 @@ func CreateExecutionNode(

}

func (n *ExecutionNode) GetL1GasPriceEstimate() (uint64, error) {
return n.ExecEngine.GetL1GasPriceEstimate()
}

func (n *ExecutionNode) Initialize(ctx context.Context) error {
n.ArbInterface.Initialize(n)
err := n.Backend.Start()
Expand Down
Loading

0 comments on commit d28682b

Please sign in to comment.