Skip to content

Commit

Permalink
indexer
Browse files Browse the repository at this point in the history
  • Loading branch information
MarkCherepovskyi committed Aug 24, 2023
1 parent 9357c85 commit f386b1e
Show file tree
Hide file tree
Showing 2 changed files with 46 additions and 6 deletions.
26 changes: 22 additions & 4 deletions internal/service/core/handler/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"github.com/dl-only-tokens/back-listener/internal/service/core/rarimo"
"github.com/pkg/errors"
"gitlab.com/distributed_lab/logan/v3"
"math/big"
"sync"
)

Expand Down Expand Up @@ -79,7 +80,15 @@ func (h *ListenerHandler) autoInitContracts() error {
func (h *ListenerHandler) initListeners(data []config.NetInfo) error {
for _, network := range data {

preparedListener, err := h.prepareNewListener(network.Name, network.Address)
preparedListener, err := h.prepareNewListener(network.Name, network.Address, false)
if err != nil {
h.log.WithError(err).Error("failed to connect to rpc")
continue
}

h.addNewListener(preparedListener)

preparedListener, err = h.prepareNewListener(network.Name, network.Address, true)
if err != nil {
h.log.WithError(err).Error("failed to connect to rpc")
continue
Expand All @@ -91,7 +100,7 @@ func (h *ListenerHandler) initListeners(data []config.NetInfo) error {
return nil
}

func (h *ListenerHandler) prepareNewListener(network string, address string) (listener.Listener, error) {
func (h *ListenerHandler) prepareNewListener(network string, address string, isIndexer bool) (listener.Listener, error) {
netInfo := h.findNetwork(network)
if netInfo == nil {
return nil, errors.New(fmt.Sprintf("unsupported network: %s", network))
Expand All @@ -108,7 +117,13 @@ func (h *ListenerHandler) prepareNewListener(network string, address string) (li
NetworkName: network,
}

return listener.NewListener(h.ctx, h.log, h.pauseTime, info, h.masterQ, h.txMetaData, h.healthCheckChan, h.abiPath, nil), nil
var startBlock *big.Int

if isIndexer {
network = fmt.Sprint(network, "_indexer")
startBlock = big.NewInt(int64(netInfo.StartBlock))
}
return listener.NewListener(h.ctx, h.log, h.pauseTime, info, h.masterQ, h.txMetaData, h.healthCheckChan, h.abiPath, startBlock, isIndexer), nil
}

func (h *ListenerHandler) addNewListener(listener listener.Listener) {
Expand All @@ -117,8 +132,11 @@ func (h *ListenerHandler) addNewListener(listener listener.Listener) {

func (h *ListenerHandler) healthCheck() {
for {

failedListeners := <-h.healthCheckChan
if failedListeners.IsIndexer() {
continue
}

go failedListeners.Restart(h.ctx)
}
}
Expand Down
26 changes: 24 additions & 2 deletions internal/service/core/listener/listenr.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ type Listener interface {
Run()
GetNetwork() string
Restart(parent context.Context)
IsIndexer() bool
}

type ListenData struct {
Expand All @@ -43,6 +44,7 @@ type ListenData struct {
clientRPC *ethclient.Client
lastListenedBlock *big.Int
lastBlock uint64
isIndexer bool
}

type EthInfo struct {
Expand All @@ -52,7 +54,7 @@ type EthInfo struct {
NetworkName string
}

func NewListener(parentCtx context.Context, log *logan.Entry, pauseTime int, ethInfo EthInfo, masterQ data.MasterQ, metaData *config.MetaData, healthCheckChan chan Listener, abiPath string, lastBlock *big.Int) Listener {
func NewListener(parentCtx context.Context, log *logan.Entry, pauseTime int, ethInfo EthInfo, masterQ data.MasterQ, metaData *config.MetaData, healthCheckChan chan Listener, abiPath string, lastBlock *big.Int, isIndexer bool) Listener {
ctx, cancelFunc := context.WithCancel(parentCtx)
return &ListenData{
chainID: ethInfo.ChainID,
Expand All @@ -68,13 +70,18 @@ func NewListener(parentCtx context.Context, log *logan.Entry, pauseTime int, eth
healthCheckChan: healthCheckChan,
abiPath: abiPath,
lastListenedBlock: lastBlock,
isIndexer: isIndexer,
}
}

func (l *ListenData) GetNetwork() string {
return l.chainName
}

func (l *ListenData) IsIndexer() bool {
return l.isIndexer
}

func (l *ListenData) Restart(parent context.Context) {
l.ctx, l.ctxCancelFunc = context.WithCancel(parent)
l.log.Debug("restart")
Expand Down Expand Up @@ -109,26 +116,38 @@ func (l *ListenData) run(ctx context.Context) {
case <-ctx.Done():
return
case <-ticker.C:
ticker.Reset(tickerTime)
ticker.Reset(1 * time.Second)
if !l.isIndexer {
ticker.Reset(tickerTime)
}
l.lastBlock, err = l.clientRPC.BlockNumber(ctx)
if err != nil {
l.log.WithError(err).Error(l.chainName, ": failed to get number of blocks")
l.ctxCancelFunc()
continue
}

if l.isIndexer {
l.log.Debug("INDEXER ", l.chainName)
}

if l.lastListenedBlock == nil {
l.lastListenedBlock = big.NewInt(int64(l.lastBlock))
}

for l.lastBlock >= l.lastListenedBlock.Uint64() {

block, err := l.clientRPC.BlockByNumber(context.Background(), l.lastListenedBlock)
if err != nil {
l.log.WithError(err).Error(l.chainName, ": failed to get last block ")
l.ctxCancelFunc()
continue
}

if l.isIndexer {
l.log.Debug("last block: ", l.lastListenedBlock.String(), " ", l.chainName)
}

l.lastListenedBlock = l.lastListenedBlock.Add(l.lastListenedBlock, big.NewInt(1))

hash := block.Hash()
Expand Down Expand Up @@ -169,6 +188,9 @@ func (l *ListenData) run(ctx context.Context) {
continue
}
}
if l.isIndexer {
return
}
}
}
}
Expand Down

0 comments on commit f386b1e

Please sign in to comment.