Skip to content

Commit

Permalink
Move event staging logic into a generic EventStager
Browse files Browse the repository at this point in the history
  • Loading branch information
teddyding committed Oct 2, 2024
1 parent e2438dd commit 579734c
Show file tree
Hide file tree
Showing 5 changed files with 38 additions and 3 deletions.
13 changes: 13 additions & 0 deletions protocol/streaming/full_node_streaming_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -813,6 +813,13 @@ func (sm *FullNodeStreamingManagerImpl) FlushStreamUpdatesWithLock() {
subscriptionUpdates := make(map[uint32][]clobtypes.StreamUpdate)
idsToRemove := make([]uint32, 0)

if len(sm.streamUpdateCache) > 0 {
fmt.Printf("!! Flushing stream, cache length = %v\n", len(sm.streamUpdateCache))
// for _, u := range sm.streamUpdateCache {
// fmt.Printf("[%s]\n", u.String())
// }
}

// Collect updates for each subscription.
for i, update := range sm.streamUpdateCache {
subscriptionIds := sm.streamUpdateSubscriptionCache[i]
Expand Down Expand Up @@ -929,8 +936,10 @@ func (sm *FullNodeStreamingManagerImpl) StreamBatchUpdatesAfterFinalizeBlock(
sm.Lock()
defer sm.Unlock()

fmt.Println("(Flushing stream pre batch update)")
// Flush all pending updates, since we want the onchain updates to arrive in a batch.
sm.FlushStreamUpdatesWithLock()
fmt.Println("(Finished flushing stream pre batch update)")

// Cache updates to sync local ops queue
sycnLocalUpdates, syncLocalClobPairIds := getStreamUpdatesFromOffchainUpdates(
Expand Down Expand Up @@ -967,9 +976,13 @@ func (sm *FullNodeStreamingManagerImpl) StreamBatchUpdatesAfterFinalizeBlock(
)
sm.cacheStreamUpdatesBySubaccountWithLock(subaccountStreamUpdates, subaccountIds)

fmt.Println("=== ! Flushing stream OF batch update==")

// Emit all stream updates in a single batch.
// Note we still have the lock, which is released right before function returns.
sm.FlushStreamUpdatesWithLock()

fmt.Println("=== ! FINISHED stream OF batch update==")
}

// getStagedEventsFromFinalizeBlock returns staged events from `FinalizeBlock`.
Expand Down
2 changes: 2 additions & 0 deletions protocol/x/clob/keeper/order_state.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package keeper
import (
"bytes"
"encoding/binary"
"fmt"

"cosmossdk.io/store/prefix"
sdk "github.com/cosmos/cosmos-sdk/types"
Expand Down Expand Up @@ -259,6 +260,7 @@ func (k Keeper) RemoveOrderFillAmount(ctx sdk.Context, orderId types.OrderId) {

// If grpc stream is on, zero out the fill amount.
if k.GetFullNodeStreamingManager().Enabled() {
fmt.Printf("!! RemoveOrderFillAmount, mode = %v, height = %v, block time = %v\n", ctx.ExecMode(), ctx.BlockHeight(), ctx.BlockTime())
allUpdates := types.NewOffchainUpdates()
if message, success := off_chain_updates.CreateOrderUpdateMessage(
ctx,
Expand Down
1 change: 1 addition & 0 deletions protocol/x/clob/module.go
Original file line number Diff line number Diff line change
Expand Up @@ -215,5 +215,6 @@ func (am AppModule) PrepareCheckState(ctx context.Context) error {
lib.UnwrapSDKContext(ctx, types.ModuleName),
am.keeper,
)
fmt.Printf("Finished PrepareCheckStatefor height %v\n", lib.UnwrapSDKContext(ctx, types.ModuleName).BlockHeight())
return nil
}
21 changes: 20 additions & 1 deletion protocol/x/clob/types/query.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions protocol/x/subaccounts/types/update.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,6 @@ func (u UpdateType) String() string {
// WITHDRAWAL_AND_TRANSFER_DELAY_AFTER_NEGATIVE_TNC_SUBACCOUNT_SEEN_BLOCKS defines the number of
// blocks withdrawals and transfers will be blocked if a negative TNC subaccount is seen in state,
// starting from the last block a negative TNC subaccount was seen.
const WITHDRAWAL_AND_TRANSFERS_BLOCKED_AFTER_NEGATIVE_TNC_SUBACCOUNT_SEEN_BLOCKS = 50
const WITHDRAWAL_AND_TRANSFERS_BLOCKED_AFTER_NEGATIVE_TNC_SUBACCOUNT_SEEN_BLOCKS = 1

const WITHDRAWAL_AND_TRANSFERS_BLOCKED_AFTER_CHAIN_OUTAGE_DURATION = 5 * time.Minute
const WITHDRAWAL_AND_TRANSFERS_BLOCKED_AFTER_CHAIN_OUTAGE_DURATION = 1 * time.Second

0 comments on commit 579734c

Please sign in to comment.