Skip to content

Commit

Permalink
[CLOB-1040] liquidation daemon - grpc to get previous block info (#856)
Browse files Browse the repository at this point in the history
* [CLOB-1040] liquidation daemon - grpc to get previous block info

* comments
  • Loading branch information
jayy04 authored Dec 8, 2023
1 parent fde92cf commit fb70453
Show file tree
Hide file tree
Showing 8 changed files with 248 additions and 92 deletions.
30 changes: 15 additions & 15 deletions protocol/daemons/liquidation/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,25 +2,32 @@ package client

import (
"context"
"time"

"github.com/dydxprotocol/v4-chain/protocol/daemons/server/types"
timelib "github.com/dydxprotocol/v4-chain/protocol/lib/time"
"time"

"github.com/cometbft/cometbft/libs/log"
appflags "github.com/dydxprotocol/v4-chain/protocol/app/flags"
"github.com/dydxprotocol/v4-chain/protocol/daemons/flags"
"github.com/dydxprotocol/v4-chain/protocol/daemons/liquidation/api"
daemontypes "github.com/dydxprotocol/v4-chain/protocol/daemons/types"
blocktimetypes "github.com/dydxprotocol/v4-chain/protocol/x/blocktime/types"
clobtypes "github.com/dydxprotocol/v4-chain/protocol/x/clob/types"
satypes "github.com/dydxprotocol/v4-chain/protocol/x/subaccounts/types"
)

// Client implements a daemon service client that periodically calculates and reports liquidatable subaccounts
// to the protocol.
type Client struct {
// Query clients
BlocktimeQueryClient blocktimetypes.QueryClient
SubaccountQueryClient satypes.QueryClient
ClobQueryClient clobtypes.QueryClient
LiquidationServiceClient api.LiquidationServiceClient

// include HealthCheckable to track the health of the daemon.
daemontypes.HealthCheckable

// logger is the logger for the daemon.
logger log.Logger
}
Expand Down Expand Up @@ -79,9 +86,11 @@ func (c *Client) Start(
}
}()

subaccountQueryClient := satypes.NewQueryClient(queryConn)
clobQueryClient := clobtypes.NewQueryClient(queryConn)
liquidationServiceClient := api.NewLiquidationServiceClient(daemonConn)
// Initialize the query clients. These are used to query the Cosmos gRPC query services.
c.BlocktimeQueryClient = blocktimetypes.NewQueryClient(queryConn)
c.SubaccountQueryClient = satypes.NewQueryClient(queryConn)
c.ClobQueryClient = clobtypes.NewQueryClient(queryConn)
c.LiquidationServiceClient = api.NewLiquidationServiceClient(daemonConn)

ticker := time.NewTicker(time.Duration(flags.Liquidation.LoopDelayMs) * time.Millisecond)
stop := make(chan bool)
Expand All @@ -94,9 +103,6 @@ func (c *Client) Start(
flags,
ticker,
stop,
subaccountQueryClient,
clobQueryClient,
liquidationServiceClient,
)

return nil
Expand All @@ -110,20 +116,14 @@ func StartLiquidationsDaemonTaskLoop(
flags flags.DaemonFlags,
ticker *time.Ticker,
stop <-chan bool,
subaccountQueryClient satypes.QueryClient,
clobQueryClient clobtypes.QueryClient,
liquidationServiceClient api.LiquidationServiceClient,
) {
for {
select {
case <-ticker.C:
if err := s.RunLiquidationDaemonTaskLoop(
client,
ctx,
client,
flags.Liquidation,
subaccountQueryClient,
clobQueryClient,
liquidationServiceClient,
); err != nil {
// TODO(DEC-947): Move daemon shutdown to application.
client.logger.Error("Liquidations daemon returned error", "error", err)
Expand Down
24 changes: 12 additions & 12 deletions protocol/daemons/liquidation/client/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ import (
"context"
"errors"
"fmt"
"testing"

"github.com/cometbft/cometbft/libs/log"
"github.com/cosmos/cosmos-sdk/types/query"
appflags "github.com/dydxprotocol/v4-chain/protocol/app/flags"
Expand All @@ -20,7 +22,6 @@ import (
satypes "github.com/dydxprotocol/v4-chain/protocol/x/subaccounts/types"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
"testing"
)

func TestStart_TcpConnectionFails(t *testing.T) {
Expand Down Expand Up @@ -227,14 +228,14 @@ func TestRunLiquidationDaemonTaskLoop(t *testing.T) {
s := client.SubTaskRunnerImpl{}

c := client.NewClient(log.NewNopLogger())
c.SubaccountQueryClient = queryClientMock
c.ClobQueryClient = queryClientMock
c.LiquidationServiceClient = queryClientMock

err := s.RunLiquidationDaemonTaskLoop(
c,
grpc.Ctx,
c,
flags.GetDefaultDaemonFlags().Liquidation,
queryClientMock,
queryClientMock,
queryClientMock,
)
if tc.expectedError != nil {
require.EqualError(t, err, tc.expectedError.Error())
Expand All @@ -261,12 +262,9 @@ func NewFakeSubTaskRunnerWithError(err error) *FakeSubTaskRunner {
// RunLiquidationDaemonTaskLoop is a mock implementation of the SubTaskRunner interface. It records the
// call as a sanity check, and returns the error set by NewFakeSubTaskRunnerWithError.
func (f *FakeSubTaskRunner) RunLiquidationDaemonTaskLoop(
_ *client.Client,
_ context.Context,
_ *client.Client,
_ flags.LiquidationFlags,
_ satypes.QueryClient,
_ clobtypes.QueryClient,
_ api.LiquidationServiceClient,
) error {
f.called = true
return f.err
Expand Down Expand Up @@ -314,6 +312,11 @@ func TestHealthCheck_Mixed(t *testing.T) {
// Run the sequence of task loop responses.
for _, taskLoopError := range tc.taskLoopResponses {
ticker, stop := daemontestutils.SingleTickTickerAndStop()

c.SubaccountQueryClient = &mocks.QueryClient{}
c.ClobQueryClient = &mocks.QueryClient{}
c.LiquidationServiceClient = &mocks.QueryClient{}

// Start the daemon task loop. Since we created a single-tick ticker, this will run for one iteration and
// return.
client.StartLiquidationsDaemonTaskLoop(
Expand All @@ -323,9 +326,6 @@ func TestHealthCheck_Mixed(t *testing.T) {
flags.GetDefaultDaemonFlags(),
ticker,
stop,
&mocks.QueryClient{},
&mocks.QueryClient{},
&mocks.QueryClient{},
)
}

Expand Down
41 changes: 30 additions & 11 deletions protocol/daemons/liquidation/client/grpc_helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,16 +9,38 @@ import (
"github.com/cosmos/cosmos-sdk/types/query"
"github.com/dydxprotocol/v4-chain/protocol/daemons/liquidation/api"
"github.com/dydxprotocol/v4-chain/protocol/lib/metrics"
blocktimetypes "github.com/dydxprotocol/v4-chain/protocol/x/blocktime/types"
clobtypes "github.com/dydxprotocol/v4-chain/protocol/x/clob/types"
satypes "github.com/dydxprotocol/v4-chain/protocol/x/subaccounts/types"
)

// GetPreviousBlockInfo queries a gRPC server using `QueryPreviousBlockInfoRequest`
// and returns the previous block height.
func (c *Client) GetPreviousBlockInfo(
ctx context.Context,
) (
blockHeight uint32,
err error,
) {
defer metrics.ModuleMeasureSince(
metrics.LiquidationDaemon,
metrics.DaemonGetPreviousBlockInfoLatency,
time.Now(),
)

query := &blocktimetypes.QueryPreviousBlockInfoRequest{}
response, err := c.BlocktimeQueryClient.PreviousBlockInfo(ctx, query)
if err != nil {
return 0, err
}

return response.Info.Height, nil
}

// GetAllSubaccounts queries a gRPC server and returns a list of subaccounts and
// their balances and open positions.
func GetAllSubaccounts(
daemon *Client,
func (c *Client) GetAllSubaccounts(
ctx context.Context,
client satypes.QueryClient,
limit uint64,
) (
subaccounts []satypes.Subaccount,
Expand All @@ -31,7 +53,7 @@ func GetAllSubaccounts(
for {
subaccountsFromKey, next, err := getSubaccountsFromKey(
ctx,
client,
c.SubaccountQueryClient,
limit,
nextKey,
)
Expand Down Expand Up @@ -60,10 +82,8 @@ func GetAllSubaccounts(

// CheckCollateralizationForSubaccounts queries a gRPC server using `AreSubaccountsLiquidatable`
// and returns a list of collateralization statuses for the given list of subaccount ids.
func CheckCollateralizationForSubaccounts(
daemon *Client,
func (c *Client) CheckCollateralizationForSubaccounts(
ctx context.Context,
client clobtypes.QueryClient,
subaccountIds []satypes.SubaccountId,
) (
results []clobtypes.AreSubaccountsLiquidatableResponse_Result,
Expand All @@ -79,7 +99,7 @@ func CheckCollateralizationForSubaccounts(
query := &clobtypes.AreSubaccountsLiquidatableRequest{
SubaccountIds: subaccountIds,
}
response, err := client.AreSubaccountsLiquidatable(ctx, query)
response, err := c.ClobQueryClient.AreSubaccountsLiquidatable(ctx, query)
if err != nil {
return nil, err
}
Expand All @@ -89,9 +109,8 @@ func CheckCollateralizationForSubaccounts(

// SendLiquidatableSubaccountIds sends a list of unique and potentially liquidatable
// subaccount ids to a gRPC server via `LiquidateSubaccounts`.
func SendLiquidatableSubaccountIds(
func (c *Client) SendLiquidatableSubaccountIds(
ctx context.Context,
client api.LiquidationServiceClient,
subaccountIds []satypes.SubaccountId,
) error {
defer telemetry.ModuleMeasureSince(
Expand All @@ -112,7 +131,7 @@ func SendLiquidatableSubaccountIds(
SubaccountIds: subaccountIds,
}

if _, err := client.LiquidateSubaccounts(ctx, request); err != nil {
if _, err := c.LiquidationServiceClient.LiquidateSubaccounts(ctx, request); err != nil {
return err
}
return nil
Expand Down
76 changes: 68 additions & 8 deletions protocol/daemons/liquidation/client/grpc_helper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,70 @@ import (
"github.com/dydxprotocol/v4-chain/protocol/mocks"
"github.com/dydxprotocol/v4-chain/protocol/testutil/constants"
"github.com/dydxprotocol/v4-chain/protocol/testutil/grpc"
blocktimetypes "github.com/dydxprotocol/v4-chain/protocol/x/blocktime/types"
clobtypes "github.com/dydxprotocol/v4-chain/protocol/x/clob/types"
satypes "github.com/dydxprotocol/v4-chain/protocol/x/subaccounts/types"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
)

func TestGetPreviousBlockInfo(t *testing.T) {
tests := map[string]struct {
// mocks
setupMocks func(
ctx context.Context,
mck *mocks.QueryClient,
)

// expectations
expectedBlockHeight uint32
expectedError error
}{
"Success": {
setupMocks: func(
ctx context.Context,
mck *mocks.QueryClient,
) {
response := &blocktimetypes.QueryPreviousBlockInfoResponse{
Info: &blocktimetypes.BlockInfo{
Height: uint32(50),
Timestamp: constants.TimeTen,
},
}
mck.On("PreviousBlockInfo", ctx, mock.Anything).Return(response, nil)
},
expectedBlockHeight: 50,
},
"Errors are propagated": {
setupMocks: func(
ctx context.Context,
mck *mocks.QueryClient,
) {
mck.On("PreviousBlockInfo", ctx, mock.Anything).Return(nil, errors.New("test error"))
},
expectedBlockHeight: 0,
expectedError: errors.New("test error"),
},
}

for name, tc := range tests {
t.Run(name, func(t *testing.T) {
queryClientMock := &mocks.QueryClient{}
tc.setupMocks(grpc.Ctx, queryClientMock)

daemon := client.NewClient(log.NewNopLogger())
daemon.BlocktimeQueryClient = queryClientMock
actualBlockHeight, err := daemon.GetPreviousBlockInfo(grpc.Ctx)

if err != nil {
require.EqualError(t, err, tc.expectedError.Error())
} else {
require.Equal(t, tc.expectedBlockHeight, actualBlockHeight)
}
})
}
}

func TestGetAllSubaccounts(t *testing.T) {
df := flags.GetDefaultDaemonFlags()
tests := map[string]struct {
Expand Down Expand Up @@ -101,11 +160,10 @@ func TestGetAllSubaccounts(t *testing.T) {
queryClientMock := &mocks.QueryClient{}
tc.setupMocks(grpc.Ctx, queryClientMock)

daemonClient := client.NewClient(log.NewNopLogger())
actual, err := client.GetAllSubaccounts(
daemonClient,
daemon := client.NewClient(log.NewNopLogger())
daemon.SubaccountQueryClient = queryClientMock
actual, err := daemon.GetAllSubaccounts(
grpc.Ctx,
queryClientMock,
df.Liquidation.SubaccountPageLimit,
)
if err != nil {
Expand Down Expand Up @@ -203,10 +261,9 @@ func TestCheckCollateralizationForSubaccounts(t *testing.T) {
tc.setupMocks(grpc.Ctx, queryClientMock, tc.expectedResults)

daemon := client.NewClient(log.NewNopLogger())
actual, err := client.CheckCollateralizationForSubaccounts(
daemon,
daemon.ClobQueryClient = queryClientMock
actual, err := daemon.CheckCollateralizationForSubaccounts(
grpc.Ctx,
queryClientMock,
tc.subaccountIds,
)

Expand Down Expand Up @@ -268,7 +325,10 @@ func TestSendLiquidatableSubaccountIds(t *testing.T) {
queryClientMock := &mocks.QueryClient{}
tc.setupMocks(grpc.Ctx, queryClientMock, tc.subaccountIds)

err := client.SendLiquidatableSubaccountIds(grpc.Ctx, queryClientMock, tc.subaccountIds)
daemon := client.NewClient(log.NewNopLogger())
daemon.LiquidationServiceClient = queryClientMock

err := daemon.SendLiquidatableSubaccountIds(grpc.Ctx, tc.subaccountIds)
require.Equal(t, tc.expectedError, err)
})
}
Expand Down
Loading

0 comments on commit fb70453

Please sign in to comment.