Skip to content

Commit

Permalink
fix: graceful stopping of the evm indexer
Browse files Browse the repository at this point in the history
  • Loading branch information
onikonychev committed Sep 21, 2024
1 parent 8d1b438 commit fea103a
Show file tree
Hide file tree
Showing 6 changed files with 65 additions and 29 deletions.
2 changes: 1 addition & 1 deletion app/server/evm_tx_indexer_cli.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,9 +45,9 @@ nibid evm-tx-index last-indexed latest
logger.Error("failed to open evm indexer DB", "error", err.Error())
return err
}
defer evmIndexerDB.Close()

evmTxIndexer := indexer.NewEVMTxIndexer(evmIndexerDB, logger.With("module", "evmindex"), clientCtx)
defer evmTxIndexer.CloseDBAndExit()

Check failure on line 50 in app/server/evm_tx_indexer_cli.go

View workflow job for this annotation

GitHub Actions / lint

Error return value of `evmTxIndexer.CloseDBAndExit` is not checked (errcheck)

tmdb, err := tmnode.DefaultDBProvider(&tmnode.DBContext{ID: "blockstore", Config: cfg})
if err != nil {
Expand Down
46 changes: 33 additions & 13 deletions app/server/evm_tx_indexer_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,9 @@ const (
type EVMTxIndexerService struct {
service.BaseService

txIndexer *indexer.EVMTxIndexer
client rpcclient.Client
txIndexer *indexer.EVMTxIndexer
client rpcclient.Client
cancelFunc context.CancelFunc
}

// NewEVMIndexerService returns a new service instance.
Expand All @@ -39,7 +40,9 @@ func NewEVMIndexerService(
// OnStart implements service.Service by subscribing for new blocks
// and indexing them by events.
func (service *EVMTxIndexerService) OnStart() error {
ctx := context.Background()
ctx, cancel := context.WithCancel(context.Background())
service.cancelFunc = cancel

status, err := service.client.Status(ctx)
if err != nil {
return err
Expand All @@ -57,20 +60,29 @@ func (service *EVMTxIndexerService) OnStart() error {
return err
}

go func() {
go func(ctx context.Context) {
for {
msg := <-blockHeadersChan
eventDataHeader := msg.Data.(types.EventDataNewBlockHeader)
if eventDataHeader.Header.Height > latestBlock {
latestBlock = eventDataHeader.Header.Height
// notify
select {
case newBlockSignal <- struct{}{}:
default:
select {
case <-ctx.Done(): // Listen for context cancellation to stop the goroutine
service.Logger.Info("Stopping indexer goroutine")
err := service.txIndexer.CloseDBAndExit()
if err != nil {
service.Logger.Error("Error closing indexer DB", "err", err)
}
return
case msg := <-blockHeadersChan:
eventDataHeader := msg.Data.(types.EventDataNewBlockHeader)
if eventDataHeader.Header.Height > latestBlock {
latestBlock = eventDataHeader.Header.Height
// notify
select {
case newBlockSignal <- struct{}{}:
default:
}
}
}
}
}()
}(ctx)

lastBlock, err := service.txIndexer.LastIndexedBlock()
if err != nil {
Expand Down Expand Up @@ -106,3 +118,11 @@ func (service *EVMTxIndexerService) OnStart() error {
}
}
}

func (service *EVMTxIndexerService) OnStop() {
service.Logger.Info("Stopping EVMTxIndexerService")
if service.cancelFunc != nil {
// Call the cancel function to stop the goroutine
service.cancelFunc()
}
}
19 changes: 9 additions & 10 deletions app/server/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -385,12 +385,12 @@ func startInProcess(ctx *sdkserver.Context, clientCtx client.Context, opts Start

var evmIdxer eth.EVMTxIndexer
if conf.JSONRPC.EnableIndexer {
idxer, err := OpenEVMIndexer(ctx, ctx.Logger, clientCtx, home)
evmTxIndexer, _, err := OpenEVMIndexer(ctx, ctx.Logger, clientCtx, home)
if err != nil {
logger.Error("failed to open evm indexer DB", "error", err.Error())
return err
}
evmIdxer = idxer
evmIdxer = evmTxIndexer
}

if conf.API.Enable || conf.JSONRPC.Enable {
Expand Down Expand Up @@ -508,7 +508,9 @@ func startInProcess(ctx *sdkserver.Context, clientCtx client.Context, opts Start

tmEndpoint := "/websocket"
tmRPCAddr := cfg.RPC.ListenAddress
httpSrv, httpSrvDone, err = StartJSONRPC(ctx, clientCtx, tmRPCAddr, tmEndpoint, &conf, evmIdxer)
httpSrv, httpSrvDone, err = StartJSONRPC(
ctx, clientCtx, tmRPCAddr, tmEndpoint, &conf, evmIdxer,
)
if err != nil {
return err
}
Expand Down Expand Up @@ -594,15 +596,12 @@ func OpenIndexerDB(rootDir string, backendType dbm.BackendType) (dbm.DB, error)
}

func OpenEVMIndexer(
ctx *sdkserver.Context,
logger log.Logger,
clientCtx client.Context,
homeDir string,
) (eth.EVMTxIndexer, error) {
ctx *sdkserver.Context, logger log.Logger, clientCtx client.Context, homeDir string,
) (eth.EVMTxIndexer, *EVMTxIndexerService, error) {
idxDB, err := OpenIndexerDB(homeDir, sdkserver.GetAppDBBackend(ctx.Viper))
if err != nil {
logger.Error("failed to open evm indexer DB", "error", err.Error())
return nil, err
return nil, nil, err
}

idxLogger := ctx.Logger.With("indexer", "evm")
Expand All @@ -617,7 +616,7 @@ func OpenEVMIndexer(
errCh <- err
}
}()
return evmIndexer, nil
return evmIndexer, evmIndexerService, nil
}

func openTraceWriter(traceWriterFile string) (w io.Writer, err error) {
Expand Down
9 changes: 9 additions & 0 deletions eth/indexer/evm_tx_indexer.go
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,15 @@ func LoadFirstBlock(db dbm.DB) (int64, error) {
return parseBlockNumberFromKey(it.Key())
}

// CloseDBAndExit should be called upon stopping the indexer
func (indexer *EVMTxIndexer) CloseDBAndExit() error {
err := indexer.db.Close()
if err != nil {
return errorsmod.Wrap(err, "CloseDBAndExit")
}
return nil
}

// isEthTx check if the tx is an eth tx
func isEthTx(tx sdk.Tx) bool {
extTx, ok := tx.(authante.HasExtensionOptionsTx)
Expand Down
3 changes: 2 additions & 1 deletion x/common/testutil/testnetwork/start_node.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,13 +133,14 @@ func startNodeAndServers(cfg Config, val *Validator) error {
val.Logger.Log("Set EVM indexer")

homeDir := val.Ctx.Config.RootDir
evmTxIndexer, err := server.OpenEVMIndexer(
evmTxIndexer, evmTxIndexerService, err := server.OpenEVMIndexer(
val.Ctx, evmServerCtxLogger, val.ClientCtx, homeDir,
)
if err != nil {
return err
}
val.EthTxIndexer = evmTxIndexer
val.EthTxIndexerService = evmTxIndexerService

val.jsonrpc, val.jsonrpcDone, err = server.StartJSONRPC(val.Ctx, val.ClientCtx, tmRPCAddr, tmEndpoint, val.AppConfig, nil)
if err != nil {
Expand Down
15 changes: 11 additions & 4 deletions x/common/testutil/testnetwork/validator_node.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ import (
"github.com/ethereum/go-ethereum/ethclient"
"github.com/stretchr/testify/suite"

appserver "github.com/NibiruChain/nibiru/v2/app/server"

serverconfig "github.com/NibiruChain/nibiru/v2/app/server/config"
"github.com/NibiruChain/nibiru/v2/eth"
ethrpc "github.com/NibiruChain/nibiru/v2/eth/rpc"
Expand Down Expand Up @@ -85,10 +87,11 @@ type Validator struct {
// - rpc.Local
RPCClient tmclient.Client

JSONRPCClient *ethclient.Client
EthRpcQueryClient *ethrpc.QueryClient
EthRpcBackend *backend.Backend
EthTxIndexer eth.EVMTxIndexer
JSONRPCClient *ethclient.Client
EthRpcQueryClient *ethrpc.QueryClient
EthRpcBackend *backend.Backend
EthTxIndexer eth.EVMTxIndexer
EthTxIndexerService *appserver.EVMTxIndexerService

EthRPC_ETH *rpcapi.EthAPI
EthRpc_WEB3 *rpcapi.APIWeb3
Expand Down Expand Up @@ -168,6 +171,10 @@ func stopValidatorNode(v *Validator) {
v.Logger.Log("✅ Successfully shut down JSON-RPC server")
v.jsonrpc = nil
}
if v.EthTxIndexerService != nil {
err := v.EthTxIndexerService.Stop()
v.Logger.Logf("❌ Error shutting down EVMTxIndexerService: %w", err)
}
}

if v.tmNode != nil {
Expand Down

0 comments on commit fea103a

Please sign in to comment.