Skip to content

Commit

Permalink
binance: Retry keep alive.
Browse files Browse the repository at this point in the history
  • Loading branch information
JoeGruffins committed Sep 6, 2024
1 parent 61a6f8c commit 7b64763
Show file tree
Hide file tree
Showing 2 changed files with 80 additions and 42 deletions.
97 changes: 67 additions & 30 deletions client/mm/libxc/binance.go
Original file line number Diff line number Diff line change
Expand Up @@ -476,6 +476,9 @@ type binance struct {
tradeInfo map[string]*tradeInfo
tradeUpdaters map[int]chan *Trade
tradeUpdateCounter int

listenKey atomic.Value // string
reconnectChan chan struct{}
}

var _ CEX = (*binance)(nil)
Expand Down Expand Up @@ -521,9 +524,11 @@ func newBinance(cfg *CEXConfig, binanceUS bool) *binance {
tradeInfo: make(map[string]*tradeInfo),
tradeUpdaters: make(map[int]chan *Trade),
tradeIDNoncePrefix: encode.RandomBytes(10),
reconnectChan: make(chan struct{}),
}

bnc.markets.Store(make(map[string]*bntypes.Market))
bnc.listenKey.Store("")

return bnc
}
Expand Down Expand Up @@ -1358,6 +1363,13 @@ func (bnc *binance) handleExecutionReport(update *bntypes.StreamUpdate) {
}
}

func (bnc *binance) handleListenKeyExpired(update *bntypes.StreamUpdate) {
bnc.log.Debugf("Received listenKeyExpired: %+v", update)
expireTime := time.Unix(update.E/1000, 0)
bnc.log.Errorf("Listen key %v expired at %v. Attempting to reconnect and get a new one.", update.ListenKey, expireTime)
bnc.reconnectChan <- struct{}{}
}

func (bnc *binance) handleUserDataStreamUpdate(b []byte) {
bnc.log.Tracef("Received user data stream update: %s", string(b))

Expand All @@ -1372,19 +1384,23 @@ func (bnc *binance) handleUserDataStreamUpdate(b []byte) {
bnc.handleOutboundAccountPosition(msg)
case "executionReport":
bnc.handleExecutionReport(msg)
case "listenKeyExpired":
bnc.handleListenKeyExpired(msg)
}
}

func (bnc *binance) getListenID(ctx context.Context) (string, error) {
var resp *bntypes.DataStreamKey
return resp.ListenKey, bnc.postAPI(ctx, "/api/v3/userDataStream", nil, nil, true, false, &resp)
if err := bnc.postAPI(ctx, "/api/v3/userDataStream", nil, nil, true, false, &resp); err != nil {
return "", err
}
bnc.listenKey.Store(resp.ListenKey)
return resp.ListenKey, nil
}

func (bnc *binance) getUserDataStream(ctx context.Context) (err error) {
var listenKey string

newConn := func() (*dex.ConnectionMaster, error) {
listenKey, err = bnc.getListenID(ctx)
listenKey, err := bnc.getListenID(ctx)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -1426,37 +1442,58 @@ func (bnc *binance) getUserDataStream(ctx context.Context) (err error) {
keepAlive := time.NewTicker(time.Minute * 30)
defer keepAlive.Stop()

retryKeepAlive := make(<-chan time.Time)

connected := true // do not keep alive on a failed connection

doReconnect := func() {
if cm != nil {
cm.Disconnect()
}
cm, err = newConn()
if err != nil {
connected = false
bnc.log.Errorf("Error reconnecting: %v", err)
reconnect = time.After(time.Second * 30)
} else {
connected = true
reconnect = time.After(time.Hour * 12)
}
}

doKeepAlive := func() {
if !connected {
bnc.log.Warn("Cannot keep binance connection alive because we are disconnected. Trying again in 10 seconds.")
retryKeepAlive = time.After(time.Second * 10)
return
}
q := make(url.Values)
q.Add("listenKey", bnc.listenKey.Load().(string))
// Doing a PUT on a listenKey will extend its validity for 60 minutes.
req, err := bnc.generateRequest(ctx, http.MethodPut, "/api/v3/userDataStream", q, nil, true, false)
if err != nil {
bnc.log.Errorf("Error generating keep-alive request: %v. Trying again in 10 seconds.", err)
retryKeepAlive = time.After(time.Second * 10)
return
}
if err := requestInto(req, nil); err != nil {
bnc.log.Errorf("Error sending keep-alive request: %v. Trying again in 10 seconds", err)
retryKeepAlive = time.After(time.Second * 10)
return
}
bnc.log.Debug("Binance connection keep alive sent successfully.")
}

for {
select {
case <-bnc.reconnectChan:
doReconnect()
case <-reconnect:
if cm != nil {
cm.Disconnect()
}
cm, err = newConn()
if err != nil {
connected = false
bnc.log.Errorf("Error reconnecting: %v", err)
reconnect = time.After(time.Second * 30)
} else {
connected = true
reconnect = time.After(time.Hour * 12)
}
doReconnect()
case <-retryKeepAlive:
doKeepAlive()
case <-keepAlive.C:
if !connected {
continue
}
q := make(url.Values)
q.Add("listenKey", listenKey)
// Doing a PUT on a listenKey will extend its validity for 60 minutes.
req, err := bnc.generateRequest(ctx, http.MethodPut, "/api/v3/userDataStream", q, nil, true, false)
if err != nil {
bnc.log.Errorf("Error generating keep-alive request: %v", err)
continue
}
if err := requestInto(req, nil); err != nil {
bnc.log.Errorf("Error sending keep-alive request: %v", err)
}
doKeepAlive()
case <-ctx.Done():
return
}
Expand Down
25 changes: 13 additions & 12 deletions client/mm/libxc/bntypes/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,18 +86,19 @@ type WSBalance struct {
}

type StreamUpdate struct {
Asset string `json:"a"`
EventType string `json:"e"`
ClientOrderID string `json:"c"`
CurrentOrderStatus string `json:"X"`
Balances []*WSBalance `json:"B"`
BalanceDelta float64 `json:"d,string"`
Filled float64 `json:"z,string"`
QuoteFilled float64 `json:"Z,string"`
OrderQty float64 `json:"q,string"`
QuoteOrderQty float64 `json:"Q,string"`
CancelledOrderID string `json:"C"`
E json.RawMessage `json:"E"`
Asset string `json:"a"`
EventType string `json:"e"`
ClientOrderID string `json:"c"`
CurrentOrderStatus string `json:"X"`
Balances []*WSBalance `json:"B"`
BalanceDelta float64 `json:"d,string"`
Filled float64 `json:"z,string"`
QuoteFilled float64 `json:"Z,string"`
OrderQty float64 `json:"q,string"`
QuoteOrderQty float64 `json:"Q,string"`
CancelledOrderID string `json:"C"`
E int64 `json:"E"`
ListenKey string `json:"listenKey"`
}

type RateLimit struct {
Expand Down

0 comments on commit 7b64763

Please sign in to comment.