Skip to content

Commit

Permalink
remove logs
Browse files Browse the repository at this point in the history
  • Loading branch information
MarkCherepovskyi committed Aug 24, 2023
1 parent cc5e89c commit e2fe9c2
Showing 1 changed file with 83 additions and 69 deletions.
152 changes: 83 additions & 69 deletions internal/service/core/listener/listenr.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,20 +28,21 @@ type Listener interface {
}

type ListenData struct {
chainID int32
chainName string
log *logan.Entry
pauseTime int
ctx context.Context
ctxCancelFunc context.CancelFunc
rpc string
address string
masterQ data.MasterQ
txMetaData *config.MetaData
healthCheckChan chan Listener
abiPath string
clientRPC *ethclient.Client
lastBlock *big.Int
chainID int32
chainName string
log *logan.Entry
pauseTime int
ctx context.Context
ctxCancelFunc context.CancelFunc
rpc string
address string
masterQ data.MasterQ
txMetaData *config.MetaData
healthCheckChan chan Listener
abiPath string
clientRPC *ethclient.Client
lastListenedBlock *big.Int
lastBlock uint64
}

type EthInfo struct {
Expand All @@ -54,19 +55,19 @@ type EthInfo struct {
func NewListener(parentCtx context.Context, log *logan.Entry, pauseTime int, ethInfo EthInfo, masterQ data.MasterQ, metaData *config.MetaData, healthCheckChan chan Listener, abiPath string, lastBlock *big.Int) Listener {
ctx, cancelFunc := context.WithCancel(parentCtx)
return &ListenData{
chainID: ethInfo.ChainID,
chainName: ethInfo.NetworkName,
log: log,
pauseTime: pauseTime,
ctx: ctx,
ctxCancelFunc: cancelFunc,
rpc: ethInfo.RPC,
address: ethInfo.Address,
masterQ: masterQ,
txMetaData: metaData,
healthCheckChan: healthCheckChan,
abiPath: abiPath,
lastBlock: lastBlock,
chainID: ethInfo.ChainID,
chainName: ethInfo.NetworkName,
log: log,
pauseTime: pauseTime,
ctx: ctx,
ctxCancelFunc: cancelFunc,
rpc: ethInfo.RPC,
address: ethInfo.Address,
masterQ: masterQ,
txMetaData: metaData,
healthCheckChan: healthCheckChan,
abiPath: abiPath,
lastListenedBlock: lastBlock,
}
}

Expand Down Expand Up @@ -99,64 +100,77 @@ func (l *ListenData) Run() {
func (l *ListenData) run(ctx context.Context) {
contractAddress := common.HexToAddress(l.address)
var previewHash common.Hash
var err error

tickerTime := time.Duration(l.pauseTime) * 25 * time.Second
tickerTime := time.Duration(l.pauseTime) * 40 * time.Second
ticker := time.NewTicker(tickerTime)
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
ticker.Reset(tickerTime)
block, err := l.clientRPC.BlockByNumber(context.Background(), l.lastBlock)
l.lastBlock, err = l.clientRPC.BlockNumber(ctx)
if err != nil {
l.log.WithError(err).Error(l.chainName, ": failed to get last block ")
l.log.WithError(err).Error(l.chainName, ": failed to get number of blocks")
l.ctxCancelFunc()
continue
}

hash := block.Hash()
l.log.Debug("hash: ", hash)
if previewHash == hash {
continue
}

go l.indexContractTxs(block)

query := ethereum.FilterQuery{
BlockHash: &hash,
Addresses: []common.Address{contractAddress},
}
previewHash = hash

logs, err := l.clientRPC.FilterLogs(context.Background(), query)
if err != nil {
continue
}

suitableTXs, err := l.parseRecipientFromEvent(logs, block.Hash())
if err != nil {
l.log.WithError(err).Error("failed to get suitable")
continue
if l.lastListenedBlock == nil {
l.lastListenedBlock = big.NewInt(int64(l.lastBlock))
}

if len(suitableTXs) == 0 {
continue
}

preapredTxs, err := l.prepareDataToInsert(l.getTxIntputsOnBlock(suitableTXs, block))
if err != nil {
l.log.WithError(err).Error("failed to get suitable")
continue
}
if err = l.insertTxs(preapredTxs); err != nil {
l.log.WithError(err).Error("failed to use transaction")
continue
for l.lastBlock >= l.lastListenedBlock.Uint64() {
block, err := l.clientRPC.BlockByNumber(context.Background(), l.lastListenedBlock)
if err != nil {
l.log.WithError(err).Error(l.chainName, ": failed to get last block ")
l.ctxCancelFunc()
continue
}

l.lastListenedBlock = l.lastListenedBlock.Add(l.lastListenedBlock, big.NewInt(1))

hash := block.Hash()
if previewHash == hash {
continue
}

go l.indexContractTxs(block)

query := ethereum.FilterQuery{
BlockHash: &hash,
Addresses: []common.Address{contractAddress},
}
previewHash = hash

logs, err := l.clientRPC.FilterLogs(context.Background(), query)
if err != nil {
continue
}

suitableTXs, err := l.parseRecipientFromEvent(logs, block.Hash())
if err != nil {
l.log.WithError(err).Error("failed to get suitable")
continue
}

if len(suitableTXs) == 0 {
continue
}

preapredTxs, err := l.prepareDataToInsert(l.getTxIntputsOnBlock(suitableTXs, block))
if err != nil {
l.log.WithError(err).Error("failed to get suitable")
continue
}
if err = l.insertTxs(preapredTxs); err != nil {
l.log.WithError(err).Error("failed to use transaction")
continue
}
}
}

}

}

func (l *ListenData) indexContractTxs(block *types.Block) {
Expand Down Expand Up @@ -291,11 +305,11 @@ func (l *ListenData) insertTxs(any interface{}) error {
return nil
}

finallTx, err := l.setTimestamp(l.packTX(selectedTxs[0], tx))
finalTx, err := l.setTimestamp(l.packTX(selectedTxs[0], tx))
if err != nil {
return errors.Wrap(err, " failed to set value and timestamp ")
}
if err = l.masterQ.TransactionsQ().New().FilterByPaymentID(tx.PaymentID).Update(finallTx); err != nil {
if err = l.masterQ.TransactionsQ().New().FilterByPaymentID(tx.PaymentID).Update(finalTx); err != nil {
return errors.Wrap(err, "failed to update tx to db")
}

Expand Down

0 comments on commit e2fe9c2

Please sign in to comment.