diff --git a/protocol/streaming/full_node_streaming_manager.go b/protocol/streaming/full_node_streaming_manager.go index 85c265f12e..cf23cdf0b3 100644 --- a/protocol/streaming/full_node_streaming_manager.go +++ b/protocol/streaming/full_node_streaming_manager.go @@ -813,6 +813,13 @@ func (sm *FullNodeStreamingManagerImpl) FlushStreamUpdatesWithLock() { subscriptionUpdates := make(map[uint32][]clobtypes.StreamUpdate) idsToRemove := make([]uint32, 0) + if len(sm.streamUpdateCache) > 0 { + fmt.Printf("!! Flushing stream, cache length = %v\n", len(sm.streamUpdateCache)) + // for _, u := range sm.streamUpdateCache { + // fmt.Printf("[%s]\n", u.String()) + // } + } + // Collect updates for each subscription. for i, update := range sm.streamUpdateCache { subscriptionIds := sm.streamUpdateSubscriptionCache[i] @@ -929,8 +936,10 @@ func (sm *FullNodeStreamingManagerImpl) StreamBatchUpdatesAfterFinalizeBlock( sm.Lock() defer sm.Unlock() + fmt.Println("(Flushing stream pre batch update)") // Flush all pending updates, since we want the onchain updates to arrive in a batch. sm.FlushStreamUpdatesWithLock() + fmt.Println("(Finished flushing stream pre batch update)") // Cache updates to sync local ops queue sycnLocalUpdates, syncLocalClobPairIds := getStreamUpdatesFromOffchainUpdates( @@ -967,9 +976,13 @@ func (sm *FullNodeStreamingManagerImpl) StreamBatchUpdatesAfterFinalizeBlock( ) sm.cacheStreamUpdatesBySubaccountWithLock(subaccountStreamUpdates, subaccountIds) + fmt.Println("=== ! Flushing stream OF batch update==") + // Emit all stream updates in a single batch. // Note we still have the lock, which is released right before function returns. sm.FlushStreamUpdatesWithLock() + + fmt.Println("=== ! FINISHED stream OF batch update==") } // getStagedEventsFromFinalizeBlock returns staged events from `FinalizeBlock`. diff --git a/protocol/x/clob/keeper/order_state.go b/protocol/x/clob/keeper/order_state.go index 4fdd7a2987..03e9d76ed1 100644 --- a/protocol/x/clob/keeper/order_state.go +++ b/protocol/x/clob/keeper/order_state.go @@ -3,6 +3,7 @@ package keeper import ( "bytes" "encoding/binary" + "fmt" "cosmossdk.io/store/prefix" sdk "github.com/cosmos/cosmos-sdk/types" @@ -259,6 +260,7 @@ func (k Keeper) RemoveOrderFillAmount(ctx sdk.Context, orderId types.OrderId) { // If grpc stream is on, zero out the fill amount. if k.GetFullNodeStreamingManager().Enabled() { + fmt.Printf("!! RemoveOrderFillAmount, mode = %v, height = %v, block time = %v\n", ctx.ExecMode(), ctx.BlockHeight(), ctx.BlockTime()) allUpdates := types.NewOffchainUpdates() if message, success := off_chain_updates.CreateOrderUpdateMessage( ctx, diff --git a/protocol/x/clob/module.go b/protocol/x/clob/module.go index 0e06d7e035..4c05992ebc 100644 --- a/protocol/x/clob/module.go +++ b/protocol/x/clob/module.go @@ -215,5 +215,6 @@ func (am AppModule) PrepareCheckState(ctx context.Context) error { lib.UnwrapSDKContext(ctx, types.ModuleName), am.keeper, ) + fmt.Printf("Finished PrepareCheckStatefor height %v\n", lib.UnwrapSDKContext(ctx, types.ModuleName).BlockHeight()) return nil } diff --git a/protocol/x/clob/types/query.pb.go b/protocol/x/clob/types/query.pb.go index 4f4d01af14..296be3186a 100644 --- a/protocol/x/clob/types/query.pb.go +++ b/protocol/x/clob/types/query.pb.go @@ -889,7 +889,26 @@ type StreamUpdate struct { } func (m *StreamUpdate) Reset() { *m = StreamUpdate{} } -func (m *StreamUpdate) String() string { return proto.CompactTextString(m) } +func (m *StreamUpdate) String() string { + if m == nil { + return "" + } + var updateMessageStr string + switch v := m.GetUpdateMessage().(type) { + case *StreamUpdate_OrderbookUpdate: + updateMessageStr = fmt.Sprintf("OrderbookUpdate %v", v.OrderbookUpdate.Updates) + case *StreamUpdate_OrderFill: + updateMessageStr = fmt.Sprintf("OrderFill %v", v.OrderFill.ClobMatch) + case *StreamUpdate_TakerOrder: + updateMessageStr = fmt.Sprintf("TakerOrder %v", v.TakerOrder.TakerOrder) + case *StreamUpdate_SubaccountUpdate: + updateMessageStr = fmt.Sprintf("SubaccountUpdate %v", v.SubaccountUpdate.SubaccountId.Owner) + default: + updateMessageStr = "" + } + return fmt.Sprintf("StreamUpdate{BlockHeight: %d, ExecMode: %d, UpdateMessage: %s}", m.BlockHeight, m.ExecMode, updateMessageStr) +} + func (*StreamUpdate) ProtoMessage() {} func (*StreamUpdate) Descriptor() ([]byte, []int) { return fileDescriptor_3365c195b25c5bc0, []int{16} diff --git a/protocol/x/subaccounts/types/update.go b/protocol/x/subaccounts/types/update.go index 26da5aa66f..ff9e6df6e5 100644 --- a/protocol/x/subaccounts/types/update.go +++ b/protocol/x/subaccounts/types/update.go @@ -154,6 +154,6 @@ func (u UpdateType) String() string { // WITHDRAWAL_AND_TRANSFER_DELAY_AFTER_NEGATIVE_TNC_SUBACCOUNT_SEEN_BLOCKS defines the number of // blocks withdrawals and transfers will be blocked if a negative TNC subaccount is seen in state, // starting from the last block a negative TNC subaccount was seen. -const WITHDRAWAL_AND_TRANSFERS_BLOCKED_AFTER_NEGATIVE_TNC_SUBACCOUNT_SEEN_BLOCKS = 50 +const WITHDRAWAL_AND_TRANSFERS_BLOCKED_AFTER_NEGATIVE_TNC_SUBACCOUNT_SEEN_BLOCKS = 1 -const WITHDRAWAL_AND_TRANSFERS_BLOCKED_AFTER_CHAIN_OUTAGE_DURATION = 5 * time.Minute +const WITHDRAWAL_AND_TRANSFERS_BLOCKED_AFTER_CHAIN_OUTAGE_DURATION = 1 * time.Second