Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(evm): evm tx indexer service implemented #2044

Merged
merged 8 commits into from
Sep 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- [#2030](https://github.com/NibiruChain/nibiru/pull/2030) - refactor(eth/rpc): Delete unused code and improve logging in the eth and debug namespaces
- [#2031](https://github.com/NibiruChain/nibiru/pull/2031) - fix(evm): debug calls with custom tracer and tracer options
- [#2039](https://github.com/NibiruChain/nibiru/pull/2039) - refactor(rpc-backend): remove unnecessary interface code
- [#2039](https://github.com/NibiruChain/nibiru/pull/2039) - refactor(rpc-backend): Remove mocks from eth/rpc/backend, partially completing [nibiru#2037](https://github.com/NibiruChain/nibiru/issue/2037).
- [#2044](https://github.com/NibiruChain/nibiru/pull/2044) - feat(evm): evm tx indexer service implemented
- [#2045](https://github.com/NibiruChain/nibiru/pull/2045) - test(evm): backend tests with test network and real txs

#### Dapp modules: perp, spot, oracle, etc
Expand Down
133 changes: 133 additions & 0 deletions app/server/evm_tx_indexer_cli.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
// Copyright (c) 2023-2024 Nibi, Inc.
package server

import (
"fmt"
"strconv"

"github.com/spf13/cobra"

"github.com/NibiruChain/nibiru/v2/eth/indexer"

tmnode "github.com/cometbft/cometbft/node"
sm "github.com/cometbft/cometbft/state"
tmstore "github.com/cometbft/cometbft/store"
"github.com/cosmos/cosmos-sdk/client"
"github.com/cosmos/cosmos-sdk/server"
)

func NewEVMTxIndexCmd() *cobra.Command {
cmd := &cobra.Command{
Use: "evm-tx-index [minBlockNumber|last-indexed] [maxBlockNumber|latest]",
Short: "Index historical evm blocks and transactions",
Long: `Command is useful for catching up if the node experienced a period
with EVMTxIndexer turned off or was stopped without proper closing/flushing EVMIndexerDB.
Processes blocks from minBlockNumber to maxBlockNumber, indexes evm txs.

- minBlockNumber: min block to start indexing. Supply "last-indexed" to start with the latest block available in EVMIndexerDB.
- maxBlockNumber: max block, could be a number or "latest".

Default run before the full node/archive node start should be:

nibid evm-tx-index last-indexed latest
`,
Args: cobra.ExactArgs(2),
RunE: func(cmd *cobra.Command, args []string) error {
serverCtx := server.GetServerContextFromCmd(cmd)
clientCtx, err := client.GetClientQueryContext(cmd)
if err != nil {
return err
}
cfg := serverCtx.Config
logger := serverCtx.Logger
evmIndexerDB, err := OpenIndexerDB(cfg.RootDir, server.GetAppDBBackend(serverCtx.Viper))
if err != nil {
logger.Error("failed to open evm indexer DB", "error", err.Error())
return err
}

evmTxIndexer := indexer.NewEVMTxIndexer(evmIndexerDB, logger.With("module", "evmindex"), clientCtx)

tmdb, err := tmnode.DefaultDBProvider(&tmnode.DBContext{ID: "blockstore", Config: cfg})
if err != nil {
return err
}
blockStore := tmstore.NewBlockStore(tmdb)
minAvailableHeight := blockStore.Base()
maxAvailableHeight := blockStore.Height()
fmt.Printf("Block range available on the node: %d - %d\n", minAvailableHeight, maxAvailableHeight)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use logger instead of fmt.Printf and fmt.Println for consistent logging

For better consistency and integration with the application's logging framework, replace fmt.Printf and fmt.Println with logger.Info or logger.Debug as appropriate.

Apply these changes:

             fmt.Printf("Block range available on the node: %d - %d\n", minAvailableHeight, maxAvailableHeight)
+            logger.Info("Block range available on the node", "minHeight", minAvailableHeight, "maxHeight", maxAvailableHeight)

             // ...

             fmt.Printf("Indexing blocks from %d to %d\n", fromBlock, toBlock)
+            logger.Info("Indexing blocks", "from", fromBlock, "to", toBlock)

             // ...

-                fmt.Println(height)
+                logger.Debug("Indexed block", "height", height)

             // ...

-            fmt.Println("Indexing complete")
+            logger.Info("Indexing complete")

Also applies to: 109-109, 123-123, 128-128


var fromBlock int64
var toBlock int64

// FROM block could be one of two:
// - int64 number - replaced with minAvailableHeight if too low
// - last-indexed - latest available block in EVMIndexerDB, 0 if nothing is indexed
if args[0] == "last-indexed" {
fromBlock, err = evmTxIndexer.LastIndexedBlock()
if err != nil || fromBlock < 0 {
fromBlock = 0
}
} else {
fromBlock, err = strconv.ParseInt(args[1], 10, 64)
if err != nil {
return fmt.Errorf("cannot parse min block number: %s", args[1])
}
Comment on lines +72 to +75
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fix incorrect parsing of minBlockNumber argument

The code incorrectly parses args[1] instead of args[0] when handling the minBlockNumber input. This will lead to unexpected behavior when users provide a numerical minBlockNumber.

Apply this diff to correct the argument parsing:

             } else {
-                fromBlock, err = strconv.ParseInt(args[1], 10, 64)
+                fromBlock, err = strconv.ParseInt(args[0], 10, 64)
                 if err != nil {
-                    return fmt.Errorf("cannot parse min block number: %s", args[1])
+                    return fmt.Errorf("cannot parse min block number: %s", args[0])
                 }
                 if fromBlock > maxAvailableHeight {
                     return fmt.Errorf("maximum available block is: %d", maxAvailableHeight)
Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
fromBlock, err = strconv.ParseInt(args[1], 10, 64)
if err != nil {
return fmt.Errorf("cannot parse min block number: %s", args[1])
}
fromBlock, err = strconv.ParseInt(args[0], 10, 64)
if err != nil {
return fmt.Errorf("cannot parse min block number: %s", args[0])
}

if fromBlock > maxAvailableHeight {
return fmt.Errorf("maximum available block is: %d", maxAvailableHeight)
}
}
if fromBlock < minAvailableHeight {
fromBlock = minAvailableHeight
}

// TO block could be one of two:
// - int64 number - replaced with maxAvailableHeight if too high
// - latest - latest available block in the node
if args[1] == "latest" {
toBlock = maxAvailableHeight
} else {
toBlock, err = strconv.ParseInt(args[1], 10, 64)
if err != nil {
return fmt.Errorf("cannot parse max block number: %s", args[1])
}
if toBlock > maxAvailableHeight {
toBlock = maxAvailableHeight
}
}
Comment on lines +95 to +97
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add check for toBlock less than minAvailableHeight

Currently, there is no validation to ensure that toBlock is not less than the minimum available block height. This could cause the indexing process to malfunction if toBlock is set to a height lower than what's available.

Consider adding this check:

if toBlock < minAvailableHeight {
    return fmt.Errorf("maxBlockNumber (%d) cannot be less than the minimum available block height (%d)", toBlock, minAvailableHeight)
}

if fromBlock > toBlock {
return fmt.Errorf("minBlockNumber must be less or equal to maxBlockNumber")
}
stateDB, err := tmnode.DefaultDBProvider(&tmnode.DBContext{ID: "state", Config: cfg})
if err != nil {
return err
}
stateStore := sm.NewStore(stateDB, sm.StoreOptions{
DiscardABCIResponses: cfg.Storage.DiscardABCIResponses,
})

fmt.Printf("Indexing blocks from %d to %d\n", fromBlock, toBlock)
for height := fromBlock; height <= toBlock; height++ {
block := blockStore.LoadBlock(height)
if block == nil {
return fmt.Errorf("block not found %d", height)
}
blockResults, err := stateStore.LoadABCIResponses(height)
if err != nil {
return err
}
if err := evmTxIndexer.IndexBlock(block, blockResults.DeliverTxs); err != nil {
return err
}
fmt.Println(height)
}
err = evmTxIndexer.CloseDBAndExit()
if err != nil {
return err
}
Comment on lines +124 to +127
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Tip

Codebase Verification

Inconsistent error handling for CloseDBAndExit in app/server/evm_tx_indexer_cli.go.

  • Consider adding error logging or additional context when returning the error to ensure proper cleanup handling.
Analysis chain

Ensure proper cleanup when closing the indexer

The error returned by evmTxIndexer.CloseDBAndExit() is currently being returned without additional context. Confirm whether any additional cleanup is necessary or if error logging is sufficient to handle potential issues during the closing process.

Run the following script to search for other usages of CloseDBAndExit() and verify error handling practices:

Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# Description: Find all usages of CloseDBAndExit in the codebase to verify error handling.

# Search for CloseDBAndExit method calls and display surrounding context.
rg --type go -C 3 $'CloseDBAndExit\(\)'

Length of output: 1298

fmt.Println("Indexing complete")
return nil
},
}
return cmd
}
135 changes: 135 additions & 0 deletions app/server/evm_tx_indexer_service.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
// Copyright (c) 2023-2024 Nibi, Inc.
package server

import (
"context"
"sync/atomic"
"time"

"github.com/cometbft/cometbft/libs/service"
rpcclient "github.com/cometbft/cometbft/rpc/client"
"github.com/cometbft/cometbft/types"

"github.com/NibiruChain/nibiru/v2/eth/indexer"
)

const (
EVMTxIndexerServiceName = "EVMTxIndexerService"

NewBlockWaitTimeout = 60 * time.Second
)

// EVMTxIndexerService indexes transactions for json-rpc service.
type EVMTxIndexerService struct {
service.BaseService

evmTxIndexer *indexer.EVMTxIndexer
rpcClient rpcclient.Client
cancelFunc context.CancelFunc
}

// NewEVMIndexerService returns a new service instance.
func NewEVMIndexerService(evmTxIndexer *indexer.EVMTxIndexer, rpcClient rpcclient.Client) *EVMTxIndexerService {
indexerService := &EVMTxIndexerService{evmTxIndexer: evmTxIndexer, rpcClient: rpcClient}
indexerService.BaseService = *service.NewBaseService(nil, EVMTxIndexerServiceName, indexerService)
return indexerService
}

// OnStart implements service.Service by subscribing for new blocks
// and indexing them by events.
func (service *EVMTxIndexerService) OnStart() error {
ctx, cancel := context.WithCancel(context.Background())
service.cancelFunc = cancel

status, err := service.rpcClient.Status(ctx)
if err != nil {
return err
}

// chainHeightStorage is used within goroutine and the indexer loop so, using atomic for read/write
var chainHeightStorage int64
atomic.StoreInt64(&chainHeightStorage, status.SyncInfo.LatestBlockHeight)

newBlockSignal := make(chan struct{}, 1)
blockHeadersChan, err := service.rpcClient.Subscribe(
ctx,
EVMTxIndexerServiceName,
types.QueryForEvent(types.EventNewBlockHeader).String(),
0,
)
if err != nil {
return err
}

// Goroutine listening for new blocks
go func(ctx context.Context) {
for {
select {
case <-ctx.Done():
service.Logger.Info("Stopping indexer goroutine")
err := service.evmTxIndexer.CloseDBAndExit()
if err != nil {
service.Logger.Error("Error closing indexer DB", "err", err)
}
return
case msg := <-blockHeadersChan:
eventDataHeader := msg.Data.(types.EventDataNewBlockHeader)
currentChainHeight := eventDataHeader.Header.Height
chainHeight := atomic.LoadInt64(&chainHeightStorage)
if currentChainHeight > chainHeight {
atomic.StoreInt64(&chainHeightStorage, currentChainHeight)
// notify
select {
case newBlockSignal <- struct{}{}:
default:
}
}
}
}
}(ctx)
Comment on lines +65 to +89
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ensure proper resource cleanup on service stop

The goroutine listening for new blocks calls service.evmTxIndexer.CloseDBAndExit() when the context is canceled. This could lead to race conditions or multiple calls to close the database if there are other goroutines or if OnStop also attempts to close the database.

Suggestion: Move the database closure to the OnStop method to ensure it is called only once during the service shutdown process.

Apply the following changes:

--- a/app/server/evm_tx_indexer_service.go
+++ b/app/server/evm_tx_indexer_service.go
@@ -69,10 +69,7 @@
 		case <-ctx.Done():
 			service.Logger.Info("Stopping indexer goroutine")
-			err := service.evmTxIndexer.CloseDBAndExit()
-			if err != nil {
-				service.Logger.Error("Error closing indexer DB", "err", err)
-			}
+			// Context canceled, exiting goroutine
 			return
 		case msg := <-blockHeadersChan:

Then, in the OnStop method:

 func (service *EVMTxIndexerService) OnStop() {
 	service.Logger.Info("Stopping EVMTxIndexerService")
 	if service.cancelFunc != nil {
 		service.Logger.Info("Calling EVMIndexerService CancelFunc")
 		service.cancelFunc()
 	}
+	err := service.evmTxIndexer.CloseDBAndExit()
+	if err != nil {
+		service.Logger.Error("Error closing indexer DB", "err", err)
+	}
 }
Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
go func(ctx context.Context) {
for {
select {
case <-ctx.Done():
service.Logger.Info("Stopping indexer goroutine")
err := service.evmTxIndexer.CloseDBAndExit()
if err != nil {
service.Logger.Error("Error closing indexer DB", "err", err)
}
return
case msg := <-blockHeadersChan:
eventDataHeader := msg.Data.(types.EventDataNewBlockHeader)
currentChainHeight := eventDataHeader.Header.Height
chainHeight := atomic.LoadInt64(&chainHeightStorage)
if currentChainHeight > chainHeight {
atomic.StoreInt64(&chainHeightStorage, currentChainHeight)
// notify
select {
case newBlockSignal <- struct{}{}:
default:
}
}
}
}
}(ctx)
go func(ctx context.Context) {
for {
select {
case <-ctx.Done():
service.Logger.Info("Stopping indexer goroutine")
// Context canceled, exiting goroutine
return
case msg := <-blockHeadersChan:
eventDataHeader := msg.Data.(types.EventDataNewBlockHeader)
currentChainHeight := eventDataHeader.Header.Height
chainHeight := atomic.LoadInt64(&chainHeightStorage)
if currentChainHeight > chainHeight {
atomic.StoreInt64(&chainHeightStorage, currentChainHeight)
// notify
select {
case newBlockSignal <- struct{}{}:
default:
}
}
}
}
}(ctx)


lastIndexedHeight, err := service.evmTxIndexer.LastIndexedBlock()
if err != nil {
return err
}
if lastIndexedHeight == -1 {
lastIndexedHeight = atomic.LoadInt64(&chainHeightStorage)
}

// Indexer loop
for {
chainHeight := atomic.LoadInt64(&chainHeightStorage)
if chainHeight <= lastIndexedHeight {
// nothing to index. wait for signal of new block
select {
case <-newBlockSignal:
case <-time.After(NewBlockWaitTimeout):
}
continue
}
for i := lastIndexedHeight + 1; i <= chainHeight; i++ {
block, err := service.rpcClient.Block(ctx, &i)
if err != nil {
service.Logger.Error("failed to fetch block", "height", i, "err", err)
break
}
blockResult, err := service.rpcClient.BlockResults(ctx, &i)
if err != nil {
service.Logger.Error("failed to fetch block result", "height", i, "err", err)
break
}
Comment on lines +111 to +120
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Continue indexing after transient RPC errors

When an error occurs while fetching a block or block result, the current implementation breaks out of the loop. This could halt the indexing process indefinitely due to a transient error.

Suggestion: Modify the error handling to log the error and continue with the next block instead of breaking the loop.

 			block, err := service.rpcClient.Block(ctx, &i)
 			if err != nil {
 				service.Logger.Error("failed to fetch block", "height", i, "err", err)
-				break
+				continue
 			}
 
 			blockResult, err := service.rpcClient.BlockResults(ctx, &i)
 			if err != nil {
 				service.Logger.Error("failed to fetch block result", "height", i, "err", err)
-				break
+				continue
 			}
Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
block, err := service.rpcClient.Block(ctx, &i)
if err != nil {
service.Logger.Error("failed to fetch block", "height", i, "err", err)
break
}
blockResult, err := service.rpcClient.BlockResults(ctx, &i)
if err != nil {
service.Logger.Error("failed to fetch block result", "height", i, "err", err)
break
}
block, err := service.rpcClient.Block(ctx, &i)
if err != nil {
service.Logger.Error("failed to fetch block", "height", i, "err", err)
continue
}
blockResult, err := service.rpcClient.BlockResults(ctx, &i)
if err != nil {
service.Logger.Error("failed to fetch block result", "height", i, "err", err)
continue
}

if err := service.evmTxIndexer.IndexBlock(block.Block, blockResult.TxsResults); err != nil {
service.Logger.Error("failed to index block", "height", i, "err", err)
}
lastIndexedHeight = blockResult.Height
}
}
}
Comment on lines +100 to +127
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Handle context cancellation in the indexer loop

The indexer loop does not currently check for context cancellation. If the service is stopping, the loop may continue running, preventing a graceful shutdown.

Suggestion: Add a context check at the beginning of the loop to exit when the context is canceled.

 for {
+	select {
+	case <-ctx.Done():
+		service.Logger.Info("Context canceled, stopping indexer loop")
+		return
+	default:
+	}
+
 	chainHeight := atomic.LoadInt64(&chainHeightStorage)
Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
for {
chainHeight := atomic.LoadInt64(&chainHeightStorage)
if chainHeight <= lastIndexedHeight {
// nothing to index. wait for signal of new block
select {
case <-newBlockSignal:
case <-time.After(NewBlockWaitTimeout):
}
continue
}
for i := lastIndexedHeight + 1; i <= chainHeight; i++ {
block, err := service.rpcClient.Block(ctx, &i)
if err != nil {
service.Logger.Error("failed to fetch block", "height", i, "err", err)
break
}
blockResult, err := service.rpcClient.BlockResults(ctx, &i)
if err != nil {
service.Logger.Error("failed to fetch block result", "height", i, "err", err)
break
}
if err := service.evmTxIndexer.IndexBlock(block.Block, blockResult.TxsResults); err != nil {
service.Logger.Error("failed to index block", "height", i, "err", err)
}
lastIndexedHeight = blockResult.Height
}
}
}
for {
select {
case <-ctx.Done():
service.Logger.Info("Context canceled, stopping indexer loop")
return
default:
}
chainHeight := atomic.LoadInt64(&chainHeightStorage)
if chainHeight <= lastIndexedHeight {
// nothing to index. wait for signal of new block
select {
case <-newBlockSignal:
case <-time.After(NewBlockWaitTimeout):
}
continue
}
for i := lastIndexedHeight + 1; i <= chainHeight; i++ {
block, err := service.rpcClient.Block(ctx, &i)
if err != nil {
service.Logger.Error("failed to fetch block", "height", i, "err", err)
break
}
blockResult, err := service.rpcClient.BlockResults(ctx, &i)
if err != nil {
service.Logger.Error("failed to fetch block result", "height", i, "err", err)
break
}
if err := service.evmTxIndexer.IndexBlock(block.Block, blockResult.TxsResults); err != nil {
service.Logger.Error("failed to index block", "height", i, "err", err)
}
lastIndexedHeight = blockResult.Height
}
}
}


func (service *EVMTxIndexerService) OnStop() {
service.Logger.Info("Stopping EVMTxIndexerService")
if service.cancelFunc != nil {
service.Logger.Info("Calling EVMIndexerService CancelFunc")
service.cancelFunc()
}
}
11 changes: 6 additions & 5 deletions app/server/json_rpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,15 @@ import (
)

// StartJSONRPC starts the JSON-RPC server
func StartJSONRPC(ctx *server.Context,
func StartJSONRPC(
ctx *server.Context,
clientCtx client.Context,
tmRPCAddr,
tmEndpoint string,
config *srvconfig.Config,
indexer eth.EVMTxIndexer,
) (*http.Server, chan struct{}, error) {
tmWsClient := ConnectTmWS(tmRPCAddr, tmEndpoint, ctx.Logger)
tmWsClientForRPCApi := ConnectTmWS(tmRPCAddr, tmEndpoint, ctx.Logger)

logger := ctx.Logger.With("module", "geth")
ethlog.Root().SetHandler(ethlog.FuncHandler(func(r *ethlog.Record) error {
Expand All @@ -48,7 +49,7 @@ func StartJSONRPC(ctx *server.Context,
allowUnprotectedTxs := config.JSONRPC.AllowUnprotectedTxs
rpcAPIArr := config.JSONRPC.API

apis := rpcapi.GetRPCAPIs(ctx, clientCtx, tmWsClient, allowUnprotectedTxs, indexer, rpcAPIArr)
apis := rpcapi.GetRPCAPIs(ctx, clientCtx, tmWsClientForRPCApi, allowUnprotectedTxs, indexer, rpcAPIArr)

for _, api := range apis {
if err := rpcServer.RegisterName(api.Namespace, api.Service); err != nil {
Expand Down Expand Up @@ -108,8 +109,8 @@ func StartJSONRPC(ctx *server.Context,
ctx.Logger.Info("Starting JSON WebSocket server", "address", config.JSONRPC.WsAddress)

// allocate separate WS connection to Tendermint
tmWsClient = ConnectTmWS(tmRPCAddr, tmEndpoint, ctx.Logger)
wsSrv := rpcapi.NewWebsocketsServer(clientCtx, ctx.Logger, tmWsClient, config)
tmWsClientForRPCWs := ConnectTmWS(tmRPCAddr, tmEndpoint, ctx.Logger)
wsSrv := rpcapi.NewWebsocketsServer(clientCtx, ctx.Logger, tmWsClientForRPCWs, config)
wsSrv.Start()
return httpSrv, httpSrvDone, nil
}
Loading
Loading