-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
* add fixed indexer prototype * add fixed indexer runner * add a check if the order/match exists * fix order/match getting url * refactor indexer * fix ws subscription * fix update events unpacking * remove block range * Revert "remove block range" This reverts commit 420142f. * consider block range * make websocket usage optional * use override_last_block cfg field instead of defaultLastBlock const
- Loading branch information
1 parent
1ad7458
commit 3b538c5
Showing
10 changed files
with
470 additions
and
295 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file was deleted.
Oops, something went wrong.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,106 @@ | ||
package service | ||
|
||
import ( | ||
"context" | ||
"math/big" | ||
"strconv" | ||
|
||
"github.com/Swapica/indexer-svc/internal/gobind" | ||
"github.com/ethereum/go-ethereum/core/types" | ||
"gitlab.com/distributed_lab/logan/v3" | ||
"gitlab.com/distributed_lab/logan/v3/errors" | ||
) | ||
|
||
func (r *indexer) handleOrderCreated(ctx context.Context, eventName string, log *types.Log) error { | ||
var event gobind.SwapicaOrderCreated | ||
|
||
err := r.swapicaAbi.UnpackIntoInterface(&event, eventName, log.Data) | ||
if err != nil { | ||
return errors.Wrap(err, "failed to unpack event", logan.F{ | ||
"event": eventName, | ||
}) | ||
} | ||
|
||
exists, err := r.orderExists(event.Order.OrderId.Int64()) | ||
if err != nil { | ||
return errors.Wrap(err, "failed to check if order exists") | ||
} | ||
if exists { | ||
return nil | ||
} | ||
|
||
if err = r.addOrder(ctx, event.Order, event.UseRelayer); err != nil { | ||
return errors.Wrap(err, "failed to index order") | ||
} | ||
|
||
return nil | ||
} | ||
|
||
func (r *indexer) handleOrderUpdated(ctx context.Context, eventName string, log *types.Log) error { | ||
var event gobind.SwapicaOrderUpdated | ||
|
||
err := r.swapicaAbi.UnpackIntoInterface(&event, eventName, log.Data) | ||
if err != nil { | ||
return errors.Wrap(err, "failed to unpack event", logan.F{ | ||
"event": eventName, | ||
}) | ||
} | ||
|
||
id, err := strconv.ParseInt(log.Topics[1].String(), 0, 64) | ||
if err != nil { | ||
return errors.Wrap(err, "failed to parse order id from topic") | ||
} | ||
|
||
if err = r.updateOrder(ctx, big.NewInt(id), event.Status); err != nil { | ||
return errors.Wrap(err, "failed to index order") | ||
} | ||
|
||
return nil | ||
} | ||
|
||
func (r *indexer) handleMatchCreated(ctx context.Context, eventName string, log *types.Log) error { | ||
var event gobind.SwapicaMatchCreated | ||
|
||
err := r.swapicaAbi.UnpackIntoInterface(&event, eventName, log.Data) | ||
if err != nil { | ||
return errors.Wrap(err, "failed to unpack event", logan.F{ | ||
"event": eventName, | ||
}) | ||
} | ||
|
||
exists, err := r.matchExists(event.Match.MatchId.Int64()) | ||
if err != nil { | ||
return errors.Wrap(err, "failed to check if match exists") | ||
} | ||
if exists { | ||
return nil | ||
} | ||
|
||
if err = r.addMatch(ctx, event.Match, event.UseRelayer); err != nil { | ||
return errors.Wrap(err, "failed to add match order") | ||
} | ||
|
||
return nil | ||
} | ||
|
||
func (r *indexer) handleMatchUpdated(ctx context.Context, eventName string, log *types.Log) error { | ||
var event gobind.SwapicaMatchUpdated | ||
|
||
err := r.swapicaAbi.UnpackIntoInterface(&event, eventName, log.Data) | ||
if err != nil { | ||
return errors.Wrap(err, "failed to unpack event", logan.F{ | ||
"event": eventName, | ||
}) | ||
} | ||
|
||
id, err := strconv.ParseInt(log.Topics[1].String(), 0, 64) | ||
if err != nil { | ||
return errors.Wrap(err, "failed to parse match id from topic") | ||
} | ||
|
||
if err = r.updateMatch(ctx, big.NewInt(id), event.Status); err != nil { | ||
return errors.Wrap(err, "failed to update match order") | ||
} | ||
|
||
return nil | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,129 @@ | ||
package service | ||
|
||
import ( | ||
"context" | ||
"github.com/Swapica/indexer-svc/internal/gobind" | ||
"github.com/Swapica/indexer-svc/internal/service/requests" | ||
"github.com/ethereum/go-ethereum" | ||
"github.com/ethereum/go-ethereum/common" | ||
"gitlab.com/distributed_lab/json-api-connector/cerrors" | ||
"gitlab.com/distributed_lab/logan/v3/errors" | ||
"math/big" | ||
"net/http" | ||
"net/url" | ||
"strconv" | ||
) | ||
|
||
var NotFound = errors.New("not found") | ||
|
||
func (r *indexer) filters() ethereum.FilterQuery { | ||
topics := make([]common.Hash, 0, len(r.handlers)) | ||
for eventName := range r.handlers { | ||
event := r.swapicaAbi.Events[eventName] | ||
|
||
topics = append(topics, event.ID) | ||
} | ||
|
||
filterQuery := ethereum.FilterQuery{ | ||
Addresses: []common.Address{ | ||
r.contractAddress, | ||
}, | ||
Topics: [][]common.Hash{ | ||
topics, | ||
}, | ||
} | ||
return filterQuery | ||
} | ||
|
||
func (r *indexer) addOrder(ctx context.Context, o gobind.ISwapicaOrder, useRelayer bool) error { | ||
log := r.log.WithField("order_id", o.OrderId.String()) | ||
log.Debug("adding new order") | ||
body := requests.NewAddOrder(o, r.chainID, useRelayer) | ||
u, _ := url.Parse("/orders") | ||
|
||
err := r.collector.PostJSON(u, body, ctx, nil) | ||
if isConflict(err) { | ||
log.Warn("order already exists in collector DB, skipping it") | ||
return nil | ||
} | ||
|
||
return errors.Wrap(err, "failed to add order into collector service") | ||
} | ||
|
||
func (r *indexer) updateOrder(ctx context.Context, id *big.Int, status gobind.ISwapicaOrderStatus) error { | ||
r.log.WithField("order_id", id.String()).Debug("updating order status") | ||
body := requests.NewUpdateOrder(id, status) | ||
u, _ := url.Parse(strconv.FormatInt(r.chainID, 10) + "/orders") | ||
err := r.collector.PatchJSON(u, body, ctx, nil) | ||
return errors.Wrap(err, "failed to update order in collector service") | ||
} | ||
|
||
func (r *indexer) orderExists(id int64) (bool, error) { | ||
u, err := url.Parse("/orders/" + strconv.FormatInt(id, 10)) | ||
if err != nil { | ||
return false, errors.Wrap(err, "failed to parse url") | ||
} | ||
|
||
var order Order | ||
|
||
err = r.collector.Get(u, &order) | ||
if err != nil && err.Error() != NotFound.Error() { | ||
return false, errors.Wrap(err, "failed to get order") | ||
} | ||
|
||
return id == order.OrderID, nil | ||
} | ||
|
||
func (r *indexer) addMatch(ctx context.Context, mo gobind.ISwapicaMatch, useRelayer bool) error { | ||
log := r.log.WithField("match_id", mo.MatchId.String()) | ||
log.Debug("adding new match order") | ||
body := requests.NewAddMatch(mo, r.chainID, useRelayer) | ||
u, _ := url.Parse("/match_orders") | ||
|
||
err := r.collector.PostJSON(u, body, ctx, nil) | ||
if isConflict(err) { | ||
log.Warn("match order already exists in collector DB, skipping it") | ||
return nil | ||
} | ||
|
||
return errors.Wrap(err, "failed to add match order into collector service") | ||
} | ||
|
||
func (r *indexer) updateMatch(ctx context.Context, id *big.Int, state uint8) error { | ||
r.log.WithField("match_id", id.String()).Debug("updating match state") | ||
body := requests.NewUpdateMatch(id, state) | ||
u, _ := url.Parse(strconv.FormatInt(r.chainID, 10) + "/match_orders") | ||
err := r.collector.PatchJSON(u, body, ctx, nil) | ||
return errors.Wrap(err, "failed to update match order in collector service") | ||
} | ||
|
||
func (r *indexer) matchExists(id int64) (bool, error) { | ||
u, err := url.Parse("/match_orders/" + strconv.FormatInt(id, 10)) | ||
if err != nil { | ||
return false, errors.Wrap(err, "failed to parse url") | ||
} | ||
|
||
var match Match | ||
|
||
err = r.collector.Get(u, &match) | ||
if err != nil && err.Error() != NotFound.Error() { | ||
return false, errors.Wrap(err, "failed to get match") | ||
} | ||
|
||
return id == match.MatchID, nil | ||
} | ||
|
||
func (r *indexer) updateLastBlock(ctx context.Context, lastBlock uint64) error { | ||
body := requests.NewUpdateBlock(lastBlock) | ||
u, _ := url.Parse(strconv.FormatInt(r.chainID, 10) + "/block") | ||
err := r.collector.PostJSON(u, body, ctx, nil) | ||
if err != nil { | ||
return errors.Wrap(err, "failed to save last block") | ||
} | ||
return nil | ||
} | ||
|
||
func isConflict(err error) bool { | ||
c, ok := err.(cerrors.Error) | ||
return ok && c.Status() == http.StatusConflict | ||
} |
Oops, something went wrong.