Skip to content

Commit

Permalink
[DO NOT COMMIT] example sync grpc client (orderbook, fills)
Browse files Browse the repository at this point in the history
  • Loading branch information
jayy04 authored and jonfung-dydx committed Aug 2, 2024
1 parent c522048 commit 42a21bd
Show file tree
Hide file tree
Showing 12 changed files with 698 additions and 15 deletions.
25 changes: 25 additions & 0 deletions protocol/app/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,9 @@ import (
// Grpc Streaming
streaming "github.com/dydxprotocol/v4-chain/protocol/streaming/grpc"
streamingtypes "github.com/dydxprotocol/v4-chain/protocol/streaming/grpc/types"

// Grpc Streaming
streamingclient "github.com/dydxprotocol/v4-chain/protocol/streaming/grpc/client"
)

var (
Expand Down Expand Up @@ -343,6 +346,9 @@ type App struct {
// Slinky
oraclePrometheusServer *promserver.PrometheusServer
oracleMetrics servicemetrics.Metrics

// Grpc Streaming Test Client
GrpcStreamingTestClient *streamingclient.GrpcClient
}

// assertAppPreconditions assert invariants required for an application to start.
Expand Down Expand Up @@ -1410,6 +1416,11 @@ func New(
}
app.initializeRateLimiters()

if app.GrpcStreamingManager.Enabled() {
app.GrpcStreamingTestClient = streamingclient.NewGrpcClient(appFlags, app.Logger())
app.GrpcStreamingManager.SubscribeTestClient(app.GrpcStreamingTestClient)
}

// Report out app version and git commit. This will be run when validators restart.
version := version.NewInfo()
app.Logger().Info(
Expand Down Expand Up @@ -1671,6 +1682,20 @@ func (app *App) PrepareCheckStater(ctx sdk.Context) {
if err := app.ModuleManager.PrepareCheckState(ctx); err != nil {
panic(err)
}

// Comparing the local orderbook with memclob's orderbook.
if app.GrpcStreamingTestClient != nil {
app.ClobKeeper.CompareMemclobOrderbookWithLocalOrderbook(
ctx,
app.GrpcStreamingTestClient.GetOrderbook(0),
0,
)
app.ClobKeeper.CompareMemclobOrderbookWithLocalOrderbook(
ctx,
app.GrpcStreamingTestClient.GetOrderbook(1),
1,
)
}
}

// InitChainer application update at chain initialization.
Expand Down
1 change: 1 addition & 0 deletions protocol/app/app_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,7 @@ func TestAppIsFullyInitialized(t *testing.T) {
"BridgeClient",
"SlinkyClient",
"oraclePrometheusServer",
"GrpcStreamingTestClient",

// Any default constructed type can be considered initialized if the default is what is
// expected. getUninitializedStructFields relies on fields being the non-default and
Expand Down
20 changes: 20 additions & 0 deletions protocol/mocks/MemClob.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

310 changes: 310 additions & 0 deletions protocol/streaming/grpc/client/client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,310 @@
package client

import (
"sync"

"cosmossdk.io/log"

appflags "github.com/dydxprotocol/v4-chain/protocol/app/flags"
v1 "github.com/dydxprotocol/v4-chain/protocol/indexer/protocol/v1"
v1types "github.com/dydxprotocol/v4-chain/protocol/indexer/protocol/v1/types"
clobtypes "github.com/dydxprotocol/v4-chain/protocol/x/clob/types"
satypes "github.com/dydxprotocol/v4-chain/protocol/x/subaccounts/types"
)

// Example client to consume data from a gRPC server.
type GrpcClient struct {
Logger log.Logger
Orderbook map[uint32]*LocalOrderbook
}

type LocalOrderbook struct {
sync.Mutex

OrderIdToOrder map[v1types.IndexerOrderId]v1types.IndexerOrder
Bids map[uint64][]v1types.IndexerOrder
Asks map[uint64][]v1types.IndexerOrder
FillAmounts map[v1types.IndexerOrderId]uint64

Logger log.Logger
}

func NewGrpcClient(appflags appflags.Flags, logger log.Logger) *GrpcClient {
logger = logger.With("module", "grpc-example-client")

client := &GrpcClient{
Logger: logger,
Orderbook: make(map[uint32]*LocalOrderbook),
}

// Subscribe to grpc orderbook updates.
// go func() {
// grpcClient := daemontypes.GrpcClientImpl{}

// // Make a connection to the Cosmos gRPC query services.
// queryConn, err := grpcClient.NewTcpConnection(context.Background(), appflags.GrpcAddress)
// if err != nil {
// logger.Error("Failed to establish gRPC connection to Cosmos gRPC query services", "error", err)
// return
// }
// defer func() {
// if err := grpcClient.CloseConnection(queryConn); err != nil {
// logger.Error("Failed to close gRPC connection", "error", err)
// }
// }()

// clobQueryClient := clobtypes.NewQueryClient(queryConn)
// updateClient, err := clobQueryClient.StreamOrderbookUpdates(
// context.Background(),
// &clobtypes.StreamOrderbookUpdatesRequest{
// ClobPairId: []uint32{0, 1},
// },
// )
// if err != nil {
// logger.Error("Failed to stream orderbook updates", "error", err)
// return
// }

// for {
// update, err := updateClient.Recv()
// if err != nil {
// logger.Error("Failed to receive orderbook update", "error", err)
// return
// }

// logger.Info("Received orderbook update", "update", update)
// client.Update(update)
// }
// }()
return client
}

// Read method
func (c *GrpcClient) GetOrderbookSnapshot(pairId uint32) *LocalOrderbook {
return c.GetOrderbook(pairId)
}

// Write method for stream orderbook updates.
func (c *GrpcClient) Update(updates *clobtypes.StreamOrderbookUpdatesResponse) {
for _, update := range updates.GetUpdates() {
if orderUpdate := update.GetOrderbookUpdate(); orderUpdate != nil {
c.ProcessOrderbookUpdate(orderUpdate)
}
if orderFill := update.GetOrderFill(); orderFill != nil {
c.ProcessFill(orderFill)
}
}
}

// Write method for order placement updates (indexer offchain events)
// Updates may be of the place typek, remove type, or update type.
func (c *GrpcClient) ProcessOrderbookUpdate(orderUpdate *clobtypes.StreamOrderbookUpdate) {
if orderUpdate.Snapshot {
c.Orderbook = make(map[uint32]*LocalOrderbook)
}

for _, update := range orderUpdate.Updates {
if orderPlace := update.GetOrderPlace(); orderPlace != nil {
order := orderPlace.GetOrder()
orderbook := c.GetOrderbook(order.OrderId.ClobPairId)
orderbook.AddOrder(*order)
}

if orderRemove := update.GetOrderRemove(); orderRemove != nil {
orderId := orderRemove.RemovedOrderId
orderbook := c.GetOrderbook(orderId.ClobPairId)
orderbook.RemoveOrder(*orderId)
}

if orderUpdate := update.GetOrderUpdate(); orderUpdate != nil {
orderId := orderUpdate.OrderId
orderbook := c.GetOrderbook(orderId.ClobPairId)
orderbook.SetOrderFillAmount(orderId, orderUpdate.TotalFilledQuantums)
}
}
}

// Write method for orderbook fills update.
// Fills are received whenever a match is emitted by the clob.
// Match can be either liquidation or a regular order match.
func (c *GrpcClient) ProcessFill(orderFill *clobtypes.StreamOrderbookFill) {
orderMap, fillAmountMap := orderListToMap(orderFill.Orders, orderFill.FillAmounts)
clobMatch := orderFill.ClobMatch

if matchOrders := clobMatch.GetMatchOrders(); matchOrders != nil {
c.ProcessMatchOrders(matchOrders, orderMap, fillAmountMap)
}

if matchPerpLiquidation := clobMatch.GetMatchPerpetualLiquidation(); matchPerpLiquidation != nil {
c.ProcessMatchPerpetualLiquidation(matchPerpLiquidation, orderMap, fillAmountMap)
}
}

func (c *GrpcClient) ProcessMatchPerpetualLiquidation(
perpLiquidation *clobtypes.MatchPerpetualLiquidation,
orderMap map[clobtypes.OrderId]clobtypes.Order,
fillAmountMap map[clobtypes.OrderId]uint64,
) {
localOrderbook := c.Orderbook[perpLiquidation.ClobPairId]
for _, fill := range perpLiquidation.GetFills() {
makerOrder := orderMap[fill.MakerOrderId]
indexerMakerOrderId := v1.OrderIdToIndexerOrderId(makerOrder.OrderId)
localOrderbook.SetOrderFillAmount(&indexerMakerOrderId, fillAmountMap[makerOrder.OrderId])
}
}

func (c *GrpcClient) ProcessMatchOrders(
matchOrders *clobtypes.MatchOrders,
orderMap map[clobtypes.OrderId]clobtypes.Order,
fillAmountMap map[clobtypes.OrderId]uint64,
) {
takerOrderId := matchOrders.TakerOrderId
clobPairId := takerOrderId.GetClobPairId()
localOrderbook := c.Orderbook[clobPairId]

indexerTakerOrder := v1.OrderIdToIndexerOrderId(takerOrderId)
localOrderbook.SetOrderFillAmount(&indexerTakerOrder, fillAmountMap[takerOrderId])

for _, fill := range matchOrders.Fills {
makerOrder := orderMap[fill.MakerOrderId]
indexerMakerOrder := v1.OrderIdToIndexerOrderId(makerOrder.OrderId)
localOrderbook.SetOrderFillAmount(&indexerMakerOrder, fillAmountMap[makerOrder.OrderId])
}
}

// orderListToMap generates a map from orderId to order and
// orderId to fill amount. Orders and fill amounts should be the same length.
func orderListToMap(
orders []clobtypes.Order,
fillAmounts []uint64,
) (
orderMap map[clobtypes.OrderId]clobtypes.Order,
fillAmountMap map[clobtypes.OrderId]uint64,
) {
orderMap = make(map[clobtypes.OrderId]clobtypes.Order, 0)
fillAmountMap = make(map[clobtypes.OrderId]uint64, 0)

for idx, order := range orders {
orderMap[order.OrderId] = order
fillAmountMap[order.OrderId] = fillAmounts[idx]
}
return orderMap, fillAmountMap
}

func (c *GrpcClient) GetOrderbook(pairId uint32) *LocalOrderbook {
if _, ok := c.Orderbook[pairId]; !ok {
c.Orderbook[pairId] = &LocalOrderbook{
OrderIdToOrder: make(map[v1types.IndexerOrderId]v1types.IndexerOrder),
Bids: make(map[uint64][]v1types.IndexerOrder),
Asks: make(map[uint64][]v1types.IndexerOrder),
FillAmounts: make(map[v1types.IndexerOrderId]uint64),

Logger: c.Logger,
}
}
return c.Orderbook[pairId]
}

// Add an order to the local orderbook.
func (l *LocalOrderbook) AddOrder(order v1types.IndexerOrder) {
l.Lock()
defer l.Unlock()

if _, ok := l.OrderIdToOrder[order.OrderId]; ok {
l.Logger.Error("order already exists in orderbook")
}

subticks := order.GetSubticks()
if order.Side == v1types.IndexerOrder_SIDE_BUY {
if _, ok := l.Bids[subticks]; !ok {
l.Bids[subticks] = make([]v1types.IndexerOrder, 0)
}
l.Bids[subticks] = append(
l.Bids[subticks],
order,
)
} else {
if _, ok := l.Asks[subticks]; !ok {
l.Asks[subticks] = make([]v1types.IndexerOrder, 0)
}
l.Asks[subticks] = append(
l.Asks[subticks],
order,
)
}

l.OrderIdToOrder[order.OrderId] = order
}

// Remove an order from local orderbook.
func (l *LocalOrderbook) RemoveOrder(orderId v1types.IndexerOrderId) {
l.Lock()
defer l.Unlock()

if _, ok := l.OrderIdToOrder[orderId]; !ok {
l.Logger.Error("order not found in orderbook")
}

order := l.OrderIdToOrder[orderId]
subticks := order.GetSubticks()

if order.Side == v1types.IndexerOrder_SIDE_BUY {
for i, o := range l.Bids[subticks] {
if o.OrderId == order.OrderId {
l.Bids[subticks] = append(
l.Bids[subticks][:i],
l.Bids[subticks][i+1:]...,
)
break
}
}
if len(l.Bids[subticks]) == 0 {
delete(l.Bids, subticks)
}
} else {
for i, o := range l.Asks[subticks] {
if o.OrderId == order.OrderId {
l.Asks[subticks] = append(
l.Asks[subticks][:i],
l.Asks[subticks][i+1:]...,
)
break
}
}
if len(l.Asks[subticks]) == 0 {
delete(l.Asks, subticks)
}
}

delete(l.OrderIdToOrder, orderId)
}

// Update the fill amount for an order.
func (l *LocalOrderbook) SetOrderFillAmount(
orderId *v1types.IndexerOrderId,
fillAmount uint64,
) {
l.Lock()
defer l.Unlock()

if fillAmount == 0 {
delete(l.FillAmounts, *orderId)
} else {
l.FillAmounts[*orderId] = fillAmount
}
}

// Utility method to convert indexer order id to clob types order id.
// This example client uses indexer order ids on the backend. Currently
// there is no difference between indexer order id and clob order id.
func IndexerOrderIdToOrderId(idxOrderId v1types.IndexerOrderId) *clobtypes.OrderId {
return &clobtypes.OrderId{
SubaccountId: satypes.SubaccountId{
Owner: idxOrderId.SubaccountId.Owner,
Number: idxOrderId.SubaccountId.Number,
},
ClientId: idxOrderId.ClientId,
OrderFlags: idxOrderId.OrderFlags,
ClobPairId: idxOrderId.ClobPairId,
}
}
Loading

0 comments on commit 42a21bd

Please sign in to comment.