From 7b64763cb4fcf778f7f57c4e3358da633cb520f8 Mon Sep 17 00:00:00 2001 From: JoeGruff Date: Fri, 6 Sep 2024 16:48:21 +0900 Subject: [PATCH] binance: Retry keep alive. --- client/mm/libxc/binance.go | 97 ++++++++++++++++++++++---------- client/mm/libxc/bntypes/types.go | 25 ++++---- 2 files changed, 80 insertions(+), 42 deletions(-) diff --git a/client/mm/libxc/binance.go b/client/mm/libxc/binance.go index b5c3df60a8..88fae302d3 100644 --- a/client/mm/libxc/binance.go +++ b/client/mm/libxc/binance.go @@ -476,6 +476,9 @@ type binance struct { tradeInfo map[string]*tradeInfo tradeUpdaters map[int]chan *Trade tradeUpdateCounter int + + listenKey atomic.Value // string + reconnectChan chan struct{} } var _ CEX = (*binance)(nil) @@ -521,9 +524,11 @@ func newBinance(cfg *CEXConfig, binanceUS bool) *binance { tradeInfo: make(map[string]*tradeInfo), tradeUpdaters: make(map[int]chan *Trade), tradeIDNoncePrefix: encode.RandomBytes(10), + reconnectChan: make(chan struct{}), } bnc.markets.Store(make(map[string]*bntypes.Market)) + bnc.listenKey.Store("") return bnc } @@ -1358,6 +1363,13 @@ func (bnc *binance) handleExecutionReport(update *bntypes.StreamUpdate) { } } +func (bnc *binance) handleListenKeyExpired(update *bntypes.StreamUpdate) { + bnc.log.Debugf("Received listenKeyExpired: %+v", update) + expireTime := time.Unix(update.E/1000, 0) + bnc.log.Errorf("Listen key %v expired at %v. Attempting to reconnect and get a new one.", update.ListenKey, expireTime) + bnc.reconnectChan <- struct{}{} +} + func (bnc *binance) handleUserDataStreamUpdate(b []byte) { bnc.log.Tracef("Received user data stream update: %s", string(b)) @@ -1372,19 +1384,23 @@ func (bnc *binance) handleUserDataStreamUpdate(b []byte) { bnc.handleOutboundAccountPosition(msg) case "executionReport": bnc.handleExecutionReport(msg) + case "listenKeyExpired": + bnc.handleListenKeyExpired(msg) } } func (bnc *binance) getListenID(ctx context.Context) (string, error) { var resp *bntypes.DataStreamKey - return resp.ListenKey, bnc.postAPI(ctx, "/api/v3/userDataStream", nil, nil, true, false, &resp) + if err := bnc.postAPI(ctx, "/api/v3/userDataStream", nil, nil, true, false, &resp); err != nil { + return "", err + } + bnc.listenKey.Store(resp.ListenKey) + return resp.ListenKey, nil } func (bnc *binance) getUserDataStream(ctx context.Context) (err error) { - var listenKey string - newConn := func() (*dex.ConnectionMaster, error) { - listenKey, err = bnc.getListenID(ctx) + listenKey, err := bnc.getListenID(ctx) if err != nil { return nil, err } @@ -1426,37 +1442,58 @@ func (bnc *binance) getUserDataStream(ctx context.Context) (err error) { keepAlive := time.NewTicker(time.Minute * 30) defer keepAlive.Stop() + retryKeepAlive := make(<-chan time.Time) + connected := true // do not keep alive on a failed connection + + doReconnect := func() { + if cm != nil { + cm.Disconnect() + } + cm, err = newConn() + if err != nil { + connected = false + bnc.log.Errorf("Error reconnecting: %v", err) + reconnect = time.After(time.Second * 30) + } else { + connected = true + reconnect = time.After(time.Hour * 12) + } + } + + doKeepAlive := func() { + if !connected { + bnc.log.Warn("Cannot keep binance connection alive because we are disconnected. Trying again in 10 seconds.") + retryKeepAlive = time.After(time.Second * 10) + return + } + q := make(url.Values) + q.Add("listenKey", bnc.listenKey.Load().(string)) + // Doing a PUT on a listenKey will extend its validity for 60 minutes. + req, err := bnc.generateRequest(ctx, http.MethodPut, "/api/v3/userDataStream", q, nil, true, false) + if err != nil { + bnc.log.Errorf("Error generating keep-alive request: %v. Trying again in 10 seconds.", err) + retryKeepAlive = time.After(time.Second * 10) + return + } + if err := requestInto(req, nil); err != nil { + bnc.log.Errorf("Error sending keep-alive request: %v. Trying again in 10 seconds", err) + retryKeepAlive = time.After(time.Second * 10) + return + } + bnc.log.Debug("Binance connection keep alive sent successfully.") + } + for { select { + case <-bnc.reconnectChan: + doReconnect() case <-reconnect: - if cm != nil { - cm.Disconnect() - } - cm, err = newConn() - if err != nil { - connected = false - bnc.log.Errorf("Error reconnecting: %v", err) - reconnect = time.After(time.Second * 30) - } else { - connected = true - reconnect = time.After(time.Hour * 12) - } + doReconnect() + case <-retryKeepAlive: + doKeepAlive() case <-keepAlive.C: - if !connected { - continue - } - q := make(url.Values) - q.Add("listenKey", listenKey) - // Doing a PUT on a listenKey will extend its validity for 60 minutes. - req, err := bnc.generateRequest(ctx, http.MethodPut, "/api/v3/userDataStream", q, nil, true, false) - if err != nil { - bnc.log.Errorf("Error generating keep-alive request: %v", err) - continue - } - if err := requestInto(req, nil); err != nil { - bnc.log.Errorf("Error sending keep-alive request: %v", err) - } + doKeepAlive() case <-ctx.Done(): return } diff --git a/client/mm/libxc/bntypes/types.go b/client/mm/libxc/bntypes/types.go index 1f4515fde2..72f9ec1271 100644 --- a/client/mm/libxc/bntypes/types.go +++ b/client/mm/libxc/bntypes/types.go @@ -86,18 +86,19 @@ type WSBalance struct { } type StreamUpdate struct { - Asset string `json:"a"` - EventType string `json:"e"` - ClientOrderID string `json:"c"` - CurrentOrderStatus string `json:"X"` - Balances []*WSBalance `json:"B"` - BalanceDelta float64 `json:"d,string"` - Filled float64 `json:"z,string"` - QuoteFilled float64 `json:"Z,string"` - OrderQty float64 `json:"q,string"` - QuoteOrderQty float64 `json:"Q,string"` - CancelledOrderID string `json:"C"` - E json.RawMessage `json:"E"` + Asset string `json:"a"` + EventType string `json:"e"` + ClientOrderID string `json:"c"` + CurrentOrderStatus string `json:"X"` + Balances []*WSBalance `json:"B"` + BalanceDelta float64 `json:"d,string"` + Filled float64 `json:"z,string"` + QuoteFilled float64 `json:"Z,string"` + OrderQty float64 `json:"q,string"` + QuoteOrderQty float64 `json:"Q,string"` + CancelledOrderID string `json:"C"` + E int64 `json:"E"` + ListenKey string `json:"listenKey"` } type RateLimit struct {