Skip to content

Commit

Permalink
binance: Desync books on disconnect.
Browse files Browse the repository at this point in the history
  • Loading branch information
JoeGruffins committed Sep 23, 2024
1 parent 8f88af2 commit 56ed9ba
Showing 1 changed file with 21 additions and 2 deletions.
23 changes: 21 additions & 2 deletions client/mm/libxc/binance.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,8 @@ type binanceOrderBook struct {
baseConversionFactor uint64
quoteConversionFactor uint64
log dex.Logger

disconnectedChan chan struct{}
}

func newBinanceOrderBook(
Expand All @@ -86,6 +88,7 @@ func newBinanceOrderBook(
quoteConversionFactor: quoteConversionFactor,
log: log,
getSnapshot: getSnapshot,
disconnectedChan: make(chan struct{}),
}
}

Expand Down Expand Up @@ -288,6 +291,10 @@ func (b *binanceOrderBook) Connect(ctx context.Context) (*sync.WaitGroup, error
if retry != nil { // don't hammer
continue
}
case <-b.disconnectedChan:
b.log.Debugf("Unsyncing %s orderbook due to disconnect. Trying reconnect in %s", b.mktID, retryFrequency)
desync()
retry = time.After(retryFrequency)

Check failure on line 297 in client/mm/libxc/binance.go

View workflow job for this annotation

GitHub Actions / Go CI (1.22)

ineffectual assignment to retry (ineffassign)

Check failure on line 297 in client/mm/libxc/binance.go

View workflow job for this annotation

GitHub Actions / Go CI (1.23)

ineffectual assignment to retry (ineffassign)
case <-ctx.Done():
return
}
Expand All @@ -296,7 +303,7 @@ func (b *binanceOrderBook) Connect(ctx context.Context) (*sync.WaitGroup, error
b.log.Infof("Synced %s orderbook", b.mktID)
retry = nil
} else {
b.log.Infof("Failed to sync %s orderbook. Trying again in %s", b.mktID, retryFrequency)
b.log.Infof("Failed to sync %s orderbook. Trying again in %s", b.mktID, retryFrequency)
desync() // Clears the syncCache
retry = time.After(retryFrequency)
}
Expand Down Expand Up @@ -1745,6 +1752,18 @@ func (bnc *binance) connectToMarketDataStream(ctx context.Context, baseID, quote
newConnection := func() (comms.WsConn, *dex.ConnectionMaster, error) {
addr := fmt.Sprintf("%s/stream?streams=%s", bnc.wsURL, strings.Join(bnc.streams(), "/"))
// Need to send key but not signature
connectEventFunc := func(cs comms.ConnectionStatus) {
if cs != comms.Disconnected {
return
}
// If disconnected, set all books to unsynced so bots
// will not place new orders.
bnc.booksMtx.RLock()
defer bnc.booksMtx.RLock()
for _, b := range bnc.books {
b.disconnectedChan <- struct{}{}
}
}
conn, err := comms.NewWsConn(&comms.WsCfg{
URL: addr,
// Binance Docs: The websocket server will send a ping frame every 3
Expand All @@ -1759,7 +1778,7 @@ func (bnc *binance) connectToMarketDataStream(ctx context.Context, baseID, quote
default:
}
},
ConnectEventFunc: func(cs comms.ConnectionStatus) {},
ConnectEventFunc: connectEventFunc,
Logger: bnc.log.SubLogger("BNCBOOK"),
RawHandler: bnc.handleMarketDataNote,
})
Expand Down

0 comments on commit 56ed9ba

Please sign in to comment.