diff --git a/internal/service/core/handler/handler.go b/internal/service/core/handler/handler.go index 27248ef..83e945d 100644 --- a/internal/service/core/handler/handler.go +++ b/internal/service/core/handler/handler.go @@ -9,6 +9,7 @@ import ( "github.com/dl-only-tokens/back-listener/internal/service/core/rarimo" "github.com/pkg/errors" "gitlab.com/distributed_lab/logan/v3" + "math/big" "sync" ) @@ -79,7 +80,15 @@ func (h *ListenerHandler) autoInitContracts() error { func (h *ListenerHandler) initListeners(data []config.NetInfo) error { for _, network := range data { - preparedListener, err := h.prepareNewListener(network.Name, network.Address) + preparedListener, err := h.prepareNewListener(network.Name, network.Address, false) + if err != nil { + h.log.WithError(err).Error("failed to connect to rpc") + continue + } + + h.addNewListener(preparedListener) + + preparedListener, err = h.prepareNewListener(network.Name, network.Address, true) if err != nil { h.log.WithError(err).Error("failed to connect to rpc") continue @@ -91,7 +100,7 @@ func (h *ListenerHandler) initListeners(data []config.NetInfo) error { return nil } -func (h *ListenerHandler) prepareNewListener(network string, address string) (listener.Listener, error) { +func (h *ListenerHandler) prepareNewListener(network string, address string, isIndexer bool) (listener.Listener, error) { netInfo := h.findNetwork(network) if netInfo == nil { return nil, errors.New(fmt.Sprintf("unsupported network: %s", network)) @@ -108,7 +117,13 @@ func (h *ListenerHandler) prepareNewListener(network string, address string) (li NetworkName: network, } - return listener.NewListener(h.ctx, h.log, h.pauseTime, info, h.masterQ, h.txMetaData, h.healthCheckChan, h.abiPath, nil), nil + var startBlock *big.Int + + if isIndexer { + network = fmt.Sprint(network, "_indexer") + startBlock = big.NewInt(int64(netInfo.StartBlock)) + } + return listener.NewListener(h.ctx, h.log, h.pauseTime, info, h.masterQ, h.txMetaData, h.healthCheckChan, h.abiPath, startBlock, isIndexer), nil } func (h *ListenerHandler) addNewListener(listener listener.Listener) { @@ -117,8 +132,11 @@ func (h *ListenerHandler) addNewListener(listener listener.Listener) { func (h *ListenerHandler) healthCheck() { for { - failedListeners := <-h.healthCheckChan + if failedListeners.IsIndexer() { + continue + } + go failedListeners.Restart(h.ctx) } } diff --git a/internal/service/core/listener/listenr.go b/internal/service/core/listener/listenr.go index 06de8d8..cfa2b4d 100644 --- a/internal/service/core/listener/listenr.go +++ b/internal/service/core/listener/listenr.go @@ -25,6 +25,7 @@ type Listener interface { Run() GetNetwork() string Restart(parent context.Context) + IsIndexer() bool } type ListenData struct { @@ -43,6 +44,7 @@ type ListenData struct { clientRPC *ethclient.Client lastListenedBlock *big.Int lastBlock uint64 + isIndexer bool } type EthInfo struct { @@ -52,7 +54,7 @@ type EthInfo struct { NetworkName string } -func NewListener(parentCtx context.Context, log *logan.Entry, pauseTime int, ethInfo EthInfo, masterQ data.MasterQ, metaData *config.MetaData, healthCheckChan chan Listener, abiPath string, lastBlock *big.Int) Listener { +func NewListener(parentCtx context.Context, log *logan.Entry, pauseTime int, ethInfo EthInfo, masterQ data.MasterQ, metaData *config.MetaData, healthCheckChan chan Listener, abiPath string, lastBlock *big.Int, isIndexer bool) Listener { ctx, cancelFunc := context.WithCancel(parentCtx) return &ListenData{ chainID: ethInfo.ChainID, @@ -68,6 +70,7 @@ func NewListener(parentCtx context.Context, log *logan.Entry, pauseTime int, eth healthCheckChan: healthCheckChan, abiPath: abiPath, lastListenedBlock: lastBlock, + isIndexer: isIndexer, } } @@ -75,6 +78,10 @@ func (l *ListenData) GetNetwork() string { return l.chainName } +func (l *ListenData) IsIndexer() bool { + return l.isIndexer +} + func (l *ListenData) Restart(parent context.Context) { l.ctx, l.ctxCancelFunc = context.WithCancel(parent) l.log.Debug("restart") @@ -109,7 +116,10 @@ func (l *ListenData) run(ctx context.Context) { case <-ctx.Done(): return case <-ticker.C: - ticker.Reset(tickerTime) + ticker.Reset(1 * time.Second) + if !l.isIndexer { + ticker.Reset(tickerTime) + } l.lastBlock, err = l.clientRPC.BlockNumber(ctx) if err != nil { l.log.WithError(err).Error(l.chainName, ": failed to get number of blocks") @@ -117,11 +127,16 @@ func (l *ListenData) run(ctx context.Context) { continue } + if l.isIndexer { + l.log.Debug("INDEXER ", l.chainName) + } + if l.lastListenedBlock == nil { l.lastListenedBlock = big.NewInt(int64(l.lastBlock)) } for l.lastBlock >= l.lastListenedBlock.Uint64() { + block, err := l.clientRPC.BlockByNumber(context.Background(), l.lastListenedBlock) if err != nil { l.log.WithError(err).Error(l.chainName, ": failed to get last block ") @@ -129,6 +144,10 @@ func (l *ListenData) run(ctx context.Context) { continue } + if l.isIndexer { + l.log.Debug("last block: ", l.lastListenedBlock.String(), " ", l.chainName) + } + l.lastListenedBlock = l.lastListenedBlock.Add(l.lastListenedBlock, big.NewInt(1)) hash := block.Hash() @@ -169,6 +188,9 @@ func (l *ListenData) run(ctx context.Context) { continue } } + if l.isIndexer { + return + } } } }