From f0a39b476f9e2cf181974953c33a79960922d83f Mon Sep 17 00:00:00 2001 From: JoeGruff Date: Tue, 10 Sep 2024 17:46:47 +0900 Subject: [PATCH] binance: Check market depth subs. --- client/cmd/testbinance/main.go | 26 +++++++ client/mm/libxc/binance.go | 129 +++++++++++++++++++++++++++++-- client/mm/libxc/bntypes/types.go | 2 + 3 files changed, 152 insertions(+), 5 deletions(-) diff --git a/client/cmd/testbinance/main.go b/client/cmd/testbinance/main.go index 5b33b0d7b2..d5cc39e79a 100644 --- a/client/cmd/testbinance/main.go +++ b/client/cmd/testbinance/main.go @@ -538,6 +538,11 @@ func (f *fakeBinance) handleAccountSubscription(w http.ResponseWriter, r *http.R }() } +type listSubsResp struct { + ID uint64 `json:"id"` + Result []string `json:"result"` +} + func (f *fakeBinance) handleMarketStream(w http.ResponseWriter, r *http.Request) { streamsStr := r.URL.Query().Get("streams") if streamsStr == "" { @@ -587,6 +592,25 @@ func (f *fakeBinance) handleMarketStream(w http.ResponseWriter, r *http.Request) f.cleanMarkets() } + listSubscriptions := func(id uint64) { + f.marketsMtx.Lock() + defer f.marketsMtx.Unlock() + var streams []string + for mktID := range cl.markets { + streams = append(streams, fmt.Sprintf("%s@depth", mktID)) + } + resp := listSubsResp{ + ID: id, + Result: streams, + } + b, err := json.Marshal(resp) + if err != nil { + log.Errorf("LIST_SUBSCRIBE marshal error: %v", err) + } + cl.WSLink.SendRaw(b) + f.cleanMarkets() + } + conn, cm := f.newWSLink(w, r, func(b []byte) { var req bntypes.StreamSubscription if err := json.Unmarshal(b, &req); err != nil { @@ -598,6 +622,8 @@ func (f *fakeBinance) handleMarketStream(w http.ResponseWriter, r *http.Request) subscribe(req.Params) case "UNSUBSCRIBE": unsubscribe(req.Params) + case "LIST_SUBSCRIPTIONS": + listSubscriptions(req.ID) } }) if conn == nil { diff --git a/client/mm/libxc/binance.go b/client/mm/libxc/binance.go index 88fae302d3..c63a8b67f0 100644 --- a/client/mm/libxc/binance.go +++ b/client/mm/libxc/binance.go @@ -469,6 +469,9 @@ type binance struct { marketStreamMtx sync.RWMutex marketStream comms.WsConn + marketStreamRespsMtx sync.Mutex + marketStreamResps map[uint64]chan<- []string + booksMtx sync.RWMutex books map[string]*binanceOrderBook @@ -525,6 +528,7 @@ func newBinance(cfg *CEXConfig, binanceUS bool) *binance { tradeUpdaters: make(map[int]chan *Trade), tradeIDNoncePrefix: encode.RandomBytes(10), reconnectChan: make(chan struct{}), + marketStreamResps: make(map[uint64]chan<- []string), } bnc.markets.Store(make(map[string]*bntypes.Market)) @@ -1547,8 +1551,27 @@ func (bnc *binance) handleMarketDataNote(b []byte) { bnc.log.Errorf("Error unmarshaling book note: %v", err) return } - if note == nil || note.Data == nil { - bnc.log.Debugf("No data in market data update: %s", string(b)) + if note == nil { + bnc.log.Debugf("Market data update does not parse to a note: %s", string(b)) + return + } + + if note.Data == nil { + var waitingResp bool + bnc.marketStreamRespsMtx.Lock() + if ch, exists := bnc.marketStreamResps[note.ID]; exists { + waitingResp = true + timeout := time.After(time.Second * 5) + select { + case ch <- note.Result: + case <-timeout: + bnc.log.Errorf("Noone waiting for market stream result id %d", note.ID) + } + } + bnc.marketStreamRespsMtx.Unlock() + if !waitingResp { + bnc.log.Debugf("No data in market data update: %s", string(b)) + } return } @@ -1616,14 +1639,99 @@ func (bnc *binance) subscribeToAdditionalMarketDataStream(ctx context.Context, b return nil } -func (bnc *binance) streamsQuery() string { +func (bnc *binance) streams() []string { bnc.booksMtx.RLock() defer bnc.booksMtx.RUnlock() streamNames := make([]string, 0, len(bnc.books)) for mktID := range bnc.books { streamNames = append(streamNames, marketDataStreamID(mktID)) } - return strings.Join(streamNames, "/") + return streamNames +} + +// checkSubs will query binance for current market subscriptions and compare +// that to what subscriptions we should have. If there is a discrepancy a +// warning is logged and the market subbed or unsubbed. +func (bnc *binance) checkSubs(ctx context.Context) error { + bnc.marketStreamMtx.Lock() + defer bnc.marketStreamMtx.Unlock() + streams := bnc.streams() + if len(streams) == 0 { + return nil + } + + method := "LIST_SUBSCRIPTIONS" + id := atomic.AddUint64(&subscribeID, 1) + + resp := make(chan []string, 1) + bnc.marketStreamRespsMtx.Lock() + bnc.marketStreamResps[id] = resp + bnc.marketStreamRespsMtx.Unlock() + + defer func() { + bnc.marketStreamRespsMtx.Lock() + delete(bnc.marketStreamResps, id) + bnc.marketStreamRespsMtx.Unlock() + }() + + req := &bntypes.StreamSubscription{ + Method: method, + ID: id, + } + + b, err := json.Marshal(req) + if err != nil { + return fmt.Errorf("error marshaling subscription stream request: %w", err) + } + + bnc.log.Debugf("Sending %v", method) + if err := bnc.marketStream.SendRaw(b); err != nil { + return fmt.Errorf("error sending subscription stream request: %w", err) + } + + timeout := time.After(time.Second * 5) + var subs []string + select { + case subs = <-resp: + case <-timeout: + return fmt.Errorf("market stream result id %d did not come.", id) + case <-ctx.Done(): + return nil + } + + var sub []string + unsub := make([]string, len(subs)) + for i, s := range subs { + unsub[i] = strings.ToLower(s) + } + +out: + for _, us := range streams { + for i, them := range unsub { + if us == them { + unsub[i] = unsub[len(unsub)-1] + unsub = unsub[:len(unsub)-1] + continue out + } + } + sub = append(sub, us) + } + + for _, s := range sub { + bnc.log.Warnf("Subbing to previously unsubbed stream %s", s) + if err := bnc.subUnsubDepth(true, s); err != nil { + bnc.log.Errorf("Error subscribing to %s: %v", s, err) + } + } + + for _, s := range unsub { + bnc.log.Warnf("Unsubbing to previously subbed stream %s", s) + if err := bnc.subUnsubDepth(false, s); err != nil { + bnc.log.Errorf("Error unsubscribing to %s: %v", s, err) + } + } + + return nil } // connectToMarketDataStream is called when the first market is subscribed to. @@ -1635,7 +1743,7 @@ func (bnc *binance) connectToMarketDataStream(ctx context.Context, baseID, quote reconnectC := make(chan struct{}) newConnection := func() (comms.WsConn, *dex.ConnectionMaster, error) { - addr := fmt.Sprintf("%s/stream?streams=%s", bnc.wsURL, bnc.streamsQuery()) + addr := fmt.Sprintf("%s/stream?streams=%s", bnc.wsURL, strings.Join(bnc.streams(), "/")) // Need to send key but not signature conn, err := comms.NewWsConn(&comms.WsCfg{ URL: addr, @@ -1712,6 +1820,8 @@ func (bnc *binance) connectToMarketDataStream(ctx context.Context, baseID, quote return nil } + checkSubsInterval := time.Minute * 30 + checkSubs := time.After(checkSubsInterval) reconnectTimer := time.After(time.Hour * 12) for { select { @@ -1719,15 +1829,24 @@ func (bnc *binance) connectToMarketDataStream(ctx context.Context, baseID, quote if err := reconnect(); err != nil { bnc.log.Errorf("Error reconnecting: %v", err) reconnectTimer = time.After(time.Second * 30) + checkSubs = make(<-chan time.Time) continue } + checkSubs = time.After(checkSubsInterval) case <-reconnectTimer: if err := reconnect(); err != nil { bnc.log.Errorf("Error refreshing connection: %v", err) reconnectTimer = time.After(time.Second * 30) + checkSubs = make(<-chan time.Time) continue } reconnectTimer = time.After(time.Hour * 12) + checkSubs = time.After(checkSubsInterval) + case <-checkSubs: + if err := bnc.checkSubs(ctx); err != nil { + bnc.log.Errorf("Error checking subscriptions: %v", err) + } + checkSubs = time.After(checkSubsInterval) case <-ctx.Done(): bnc.marketStreamMtx.Lock() bnc.marketStream = nil diff --git a/client/mm/libxc/bntypes/types.go b/client/mm/libxc/bntypes/types.go index 72f9ec1271..75c4150322 100644 --- a/client/mm/libxc/bntypes/types.go +++ b/client/mm/libxc/bntypes/types.go @@ -77,6 +77,8 @@ type BookUpdate struct { type BookNote struct { StreamName string `json:"stream"` Data *BookUpdate `json:"data"` + ID uint64 `json:"id"` + Result []string `json:"result"` } type WSBalance struct {