diff --git a/protocol/mocks/ClobKeeper.go b/protocol/mocks/ClobKeeper.go index 5eb830f15d5..62ab311a6b1 100644 --- a/protocol/mocks/ClobKeeper.go +++ b/protocol/mocks/ClobKeeper.go @@ -1,4 +1,4 @@ -// Code generated by mockery v2.43.2. DO NOT EDIT. +// Code generated by mockery v2.44.1. DO NOT EDIT. package mocks @@ -706,9 +706,9 @@ func (_m *ClobKeeper) InitializeEquityTierLimit(ctx types.Context, config clobty return r0 } -// InitializeNewStreams provides a mock function with given fields: ctx -func (_m *ClobKeeper) InitializeNewStreams(ctx types.Context) { - _m.Called(ctx) +// InitializeNewStreams provides a mock function with given fields: ctx, subaccountSnapshots +func (_m *ClobKeeper) InitializeNewStreams(ctx types.Context, subaccountSnapshots map[subaccountstypes.SubaccountId]*subaccountstypes.StreamSubaccountUpdate) { + _m.Called(ctx, subaccountSnapshots) } // IsInitialized provides a mock function with given fields: diff --git a/protocol/streaming/full_node_streaming_manager.go b/protocol/streaming/full_node_streaming_manager.go index 9e7fa9e64f9..77c47961d0c 100644 --- a/protocol/streaming/full_node_streaming_manager.go +++ b/protocol/streaming/full_node_streaming_manager.go @@ -3,6 +3,7 @@ package streaming import ( "fmt" "sync" + "sync/atomic" "time" satypes "github.com/dydxprotocol/v4-chain/protocol/x/subaccounts/types" @@ -55,8 +56,8 @@ type FullNodeStreamingManagerImpl struct { type OrderbookSubscription struct { subscriptionId uint32 - // Initialize the subscription with orderbook snapshots. - initialize *sync.Once + // Whether the subscription is initialized with snapshot. + initialized *atomic.Bool // Clob pair ids to subscribe to. clobPairIds []uint32 @@ -75,6 +76,10 @@ type OrderbookSubscription struct { nextSnapshotBlock uint32 } +func (sub *OrderbookSubscription) IsInitialized() bool { + return sub.initialized.Load() +} + func NewFullNodeStreamingManager( logger log.Logger, flushIntervalMs uint32, @@ -159,7 +164,7 @@ func (sm *FullNodeStreamingManagerImpl) Subscribe( } subscription := &OrderbookSubscription{ subscriptionId: sm.nextSubscriptionId, - initialize: &sync.Once{}, + initialized: &atomic.Bool{}, // False by default. clobPairIds: clobPairIds, subaccountIds: sIds, messageSender: messageSender, @@ -674,9 +679,32 @@ func (sm *FullNodeStreamingManagerImpl) FlushStreamUpdatesWithLock() { sm.EmitMetrics() } +func (sm *FullNodeStreamingManagerImpl) GetSubaccountSnapshotsForInitStreams( + getSubaccountSnapshot func(subaccountId satypes.SubaccountId) *satypes.StreamSubaccountUpdate, +) map[satypes.SubaccountId]*satypes.StreamSubaccountUpdate { + sm.Lock() + defer sm.Unlock() + + ret := make(map[satypes.SubaccountId]*satypes.StreamSubaccountUpdate) + for _, subscription := range sm.orderbookSubscriptions { + if alreadyInitialized := subscription.initialized.Load(); alreadyInitialized { + continue + } + + for _, subaccountId := range subscription.subaccountIds { + if _, exists := ret[subaccountId]; exists { + continue + } + + ret[subaccountId] = getSubaccountSnapshot(subaccountId) + } + } + return ret +} + func (sm *FullNodeStreamingManagerImpl) InitializeNewStreams( getOrderbookSnapshot func(clobPairId clobtypes.ClobPairId) *clobtypes.OffchainUpdates, - getSubaccountSnapshot func(subaccountId satypes.SubaccountId) *satypes.StreamSubaccountUpdate, + subaccountSnapshots map[satypes.SubaccountId]*satypes.StreamSubaccountUpdate, blockHeight uint32, execMode sdk.ExecMode, ) { @@ -690,31 +718,35 @@ func (sm *FullNodeStreamingManagerImpl) InitializeNewStreams( updatesByClobPairId := make(map[uint32]*clobtypes.OffchainUpdates) for subscriptionId, subscription := range sm.orderbookSubscriptions { - // If the snapshot block interval is enabled, reset the sync.Once in order to - // re-send snapshots out. - if sm.snapshotBlockInterval > 0 && - blockHeight == subscription.nextSnapshotBlock { - subscription.initialize = &sync.Once{} - } - - subscription.initialize.Do( - func() { - allUpdates := clobtypes.NewOffchainUpdates() - for _, clobPairId := range subscription.clobPairIds { - if _, ok := updatesByClobPairId[clobPairId]; !ok { - updatesByClobPairId[clobPairId] = getOrderbookSnapshot(clobtypes.ClobPairId(clobPairId)) - } - allUpdates.Append(updatesByClobPairId[clobPairId]) - } - saUpdates := []*satypes.StreamSubaccountUpdate{} - for _, subaccountId := range subscription.subaccountIds { - saUpdates = append(saUpdates, getSubaccountSnapshot(subaccountId)) + if alreadyInitialized := subscription.initialized.Swap(true); !alreadyInitialized { + allUpdates := clobtypes.NewOffchainUpdates() + for _, clobPairId := range subscription.clobPairIds { + if _, ok := updatesByClobPairId[clobPairId]; !ok { + updatesByClobPairId[clobPairId] = getOrderbookSnapshot(clobtypes.ClobPairId(clobPairId)) } - sm.SendCombinedSnapshot(allUpdates, saUpdates, subscriptionId, blockHeight, execMode) - if sm.snapshotBlockInterval != 0 { - subscription.nextSnapshotBlock = blockHeight + sm.snapshotBlockInterval + allUpdates.Append(updatesByClobPairId[clobPairId]) + } + + saUpdates := []*satypes.StreamSubaccountUpdate{} + for _, subaccountId := range subscription.subaccountIds { + // TODO comment. + if saUpdate, ok := subaccountSnapshots[subaccountId]; ok { + saUpdates = append(saUpdates, saUpdate) } - }, - ) + } + + sm.SendCombinedSnapshot(allUpdates, saUpdates, subscriptionId, blockHeight, execMode) + + if sm.snapshotBlockInterval != 0 { + subscription.nextSnapshotBlock = blockHeight + sm.snapshotBlockInterval + } + } + + // If the snapshot block interval is enabled and the next block is a snapshot block, + // reset the `atomic.Bool` so snapshots are sent for the next block. + if sm.snapshotBlockInterval > 0 && + blockHeight+1 == subscription.nextSnapshotBlock { + subscription.initialized = &atomic.Bool{} // False by default. + } } } diff --git a/protocol/streaming/noop_streaming_manager.go b/protocol/streaming/noop_streaming_manager.go index 1c2f630b366..6e9c00895d7 100644 --- a/protocol/streaming/noop_streaming_manager.go +++ b/protocol/streaming/noop_streaming_manager.go @@ -62,9 +62,15 @@ func (sm *NoopGrpcStreamingManager) TracksSubaccountId(id satypes.SubaccountId) return false } +func (sm *NoopGrpcStreamingManager) GetSubaccountSnapshotsForInitStreams( + getSubaccountSnapshot func(subaccountId satypes.SubaccountId) *satypes.StreamSubaccountUpdate, +) map[satypes.SubaccountId]*satypes.StreamSubaccountUpdate { + return nil +} + func (sm *NoopGrpcStreamingManager) InitializeNewStreams( getOrderbookSnapshot func(clobPairId clobtypes.ClobPairId) *clobtypes.OffchainUpdates, - getSubaccountSnapshot func(subaccountId satypes.SubaccountId) *satypes.StreamSubaccountUpdate, + subaccountSnapshots map[satypes.SubaccountId]*satypes.StreamSubaccountUpdate, blockHeight uint32, execMode sdk.ExecMode, ) { diff --git a/protocol/streaming/types/interface.go b/protocol/streaming/types/interface.go index 3858faec766..e3dff9d94bf 100644 --- a/protocol/streaming/types/interface.go +++ b/protocol/streaming/types/interface.go @@ -22,10 +22,13 @@ type FullNodeStreamingManager interface { // L3+ Orderbook updates. InitializeNewStreams( getOrderbookSnapshot func(clobPairId clobtypes.ClobPairId) *clobtypes.OffchainUpdates, - getSubaccountSnapshot func(subaccountId satypes.SubaccountId) *satypes.StreamSubaccountUpdate, + subaccountSnapshots map[satypes.SubaccountId]*satypes.StreamSubaccountUpdate, blockHeight uint32, execMode sdk.ExecMode, ) + GetSubaccountSnapshotsForInitStreams( + getSubaccountSnapshot func(subaccountId satypes.SubaccountId) *satypes.StreamSubaccountUpdate, + ) map[satypes.SubaccountId]*satypes.StreamSubaccountUpdate SendOrderbookUpdates( offchainUpdates *clobtypes.OffchainUpdates, blockHeight uint32, diff --git a/protocol/x/clob/abci.go b/protocol/x/clob/abci.go index 641e49e897a..b9dc6db5401 100644 --- a/protocol/x/clob/abci.go +++ b/protocol/x/clob/abci.go @@ -14,6 +14,7 @@ import ( "github.com/dydxprotocol/v4-chain/protocol/lib/metrics" "github.com/dydxprotocol/v4-chain/protocol/x/clob/keeper" "github.com/dydxprotocol/v4-chain/protocol/x/clob/types" + satypes "github.com/dydxprotocol/v4-chain/protocol/x/subaccounts/types" ) // PreBlocker executes all ABCI PreBlock logic respective to the clob module. @@ -123,6 +124,12 @@ func PrepareCheckState( log.BlockHeight, ctx.BlockHeight()+1, ) + // TODO + var subaccountSnapshots map[satypes.SubaccountId]*satypes.StreamSubaccountUpdate + if keeper.GetFullNodeStreamingManager().Enabled() { + subaccountSnapshots = keeper.GetSubaccountSnapshotsForInitStreams(ctx) + } + // Prune any rate limiting information that is no longer relevant. keeper.PruneRateLimits(ctx) @@ -239,7 +246,10 @@ func PrepareCheckState( ) // Initialize new streams with orderbook snapshots, if any. - keeper.InitializeNewStreams(ctx) + keeper.InitializeNewStreams( + ctx, + subaccountSnapshots, + ) // Set per-orderbook gauges. keeper.MemClob.SetMemclobGauges(ctx) diff --git a/protocol/x/clob/keeper/keeper.go b/protocol/x/clob/keeper/keeper.go index 6817b43469e..0dc886d7859 100644 --- a/protocol/x/clob/keeper/keeper.go +++ b/protocol/x/clob/keeper/keeper.go @@ -3,9 +3,10 @@ package keeper import ( "errors" "fmt" - satypes "github.com/dydxprotocol/v4-chain/protocol/x/subaccounts/types" "sync/atomic" + satypes "github.com/dydxprotocol/v4-chain/protocol/x/subaccounts/types" + "cosmossdk.io/log" "cosmossdk.io/store/prefix" storetypes "cosmossdk.io/store/types" @@ -251,9 +252,31 @@ func (k *Keeper) SetAnteHandler(anteHandler sdk.AnteHandler) { k.antehandler = anteHandler } +func (k Keeper) GetSubaccountSnapshotsForInitStreams( + ctx sdk.Context, +) ( + subaccountSnapshots map[satypes.SubaccountId]*satypes.StreamSubaccountUpdate, +) { + // TODO assert PrepareCheckState. + + return k.GetFullNodeStreamingManager().GetSubaccountSnapshotsForInitStreams( + func(subaccountId satypes.SubaccountId) *satypes.StreamSubaccountUpdate { + subaccountUpdate := k.subaccountsKeeper.GetStreamSubaccountUpdate( + ctx, + subaccountId, + true, + ) + return &subaccountUpdate + }, + ) +} + // InitializeNewStreams initializes new streams for all uninitialized clob pairs // by sending the corresponding orderbook snapshots. -func (k Keeper) InitializeNewStreams(ctx sdk.Context) { +func (k Keeper) InitializeNewStreams( + ctx sdk.Context, + subaccountSnapshots map[satypes.SubaccountId]*satypes.StreamSubaccountUpdate, +) { streamingManager := k.GetFullNodeStreamingManager() streamingManager.InitializeNewStreams( @@ -263,14 +286,7 @@ func (k Keeper) InitializeNewStreams(ctx sdk.Context) { clobPairId, ) }, - func(subaccountId satypes.SubaccountId) *satypes.StreamSubaccountUpdate { - subaccountUpdate := k.subaccountsKeeper.GetStreamSubaccountUpdate( - ctx, - subaccountId, - true, - ) - return &subaccountUpdate - }, + subaccountSnapshots, lib.MustConvertIntegerToUint32(ctx.BlockHeight()), ctx.ExecMode(), ) diff --git a/protocol/x/clob/types/clob_keeper.go b/protocol/x/clob/types/clob_keeper.go index a961046f23f..9705439dc4c 100644 --- a/protocol/x/clob/types/clob_keeper.go +++ b/protocol/x/clob/types/clob_keeper.go @@ -137,7 +137,10 @@ type ClobKeeper interface { ) error UpdateLiquidationsConfig(ctx sdk.Context, config LiquidationsConfig) error // full node streaming - InitializeNewStreams(ctx sdk.Context) + InitializeNewStreams( + ctx sdk.Context, + subaccountSnapshots map[satypes.SubaccountId]*satypes.StreamSubaccountUpdate, + ) SendOrderbookUpdates( ctx sdk.Context, offchainUpdates *OffchainUpdates,