Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

core: protect epoch checksum with its own mutex #2977

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
75 changes: 57 additions & 18 deletions client/core/core.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,10 @@ type dexConnection struct {
// processed by a dex server.
inFlightOrders map[uint64]*InFlightOrder

// A map linking cancel order IDs to trade order IDs.
cancelsMtx sync.RWMutex
cancels map[order.OrderID]order.OrderID

blindCancelsMtx sync.Mutex
blindCancels map[order.OrderID]order.Preimage

Expand Down Expand Up @@ -253,6 +257,25 @@ func (dc *dexConnection) bondAssets() (map[uint32]*BondAsset, uint64) {
return bondAssets, cfg.BondExpiry
}

func (dc *dexConnection) registerCancelLink(cid, oid order.OrderID) {
dc.cancelsMtx.Lock()
dc.cancels[cid] = oid
dc.cancelsMtx.Unlock()
}

func (dc *dexConnection) deleteCancelLink(cid order.OrderID) {
dc.cancelsMtx.Lock()
delete(dc.cancels, cid)
dc.cancelsMtx.Unlock()
}

func (dc *dexConnection) cancelTradeID(cid order.OrderID) (order.OrderID, bool) {
dc.cancelsMtx.RLock()
defer dc.cancelsMtx.RUnlock()
oid, found := dc.cancels[cid]
return oid, found
}

// marketConfig is the market's configuration, as returned by the server in the
// 'config' response.
func (dc *dexConnection) marketConfig(mktID string) *msgjson.Market {
Expand Down Expand Up @@ -584,10 +607,12 @@ func (dc *dexConnection) findOrder(oid order.OrderID) (tracker *trackedTrade, pr
if tracker, found := dc.trades[oid]; found {
return tracker, tracker.preImg, false
}
// Search the cancel order IDs.
for _, tracker := range dc.trades {
if tracker.cancel != nil && tracker.cancel.ID() == oid {
return tracker, tracker.cancel.preImg, true

if tid, found := dc.cancelTradeID(oid); found {
if tracker, found := dc.trades[tid]; found {
return tracker, tracker.preImg, true
} else {
dc.log.Errorf("Did not find trade for cancel order ID %s", oid)
}
}
return
Expand Down Expand Up @@ -8087,6 +8112,7 @@ func (c *Core) newDEXConnection(acctInfo *db.AccountInfo, flag connectDEXFlag) (
ticker: newDexTicker(defaultTickInterval), // updated when server config obtained
books: make(map[string]*bookie),
trades: make(map[order.OrderID]*trackedTrade),
cancels: make(map[order.OrderID]order.OrderID),
inFlightOrders: make(map[uint64]*InFlightOrder),
blindCancels: make(map[order.OrderID]order.Preimage),
apiVer: -1,
Expand Down Expand Up @@ -8899,7 +8925,7 @@ func handlePreimageRequest(c *Core, dc *dexConnection, msg *msgjson.Message) err
}

if len(req.Commitment) != order.CommitmentSize {
return fmt.Errorf("received preimage request for %v with no corresponding order submission response.", oid)
return fmt.Errorf("received preimage request for %s with no corresponding order submission response", oid)
}

// See if we recognize that commitment, and if we do, just wait for the
Expand Down Expand Up @@ -8992,15 +9018,14 @@ func acceptCsum(tracker *trackedTrade, isCancel bool, commitChecksum dex.Bytes)
// Do not allow csum to be changed once it has been committed to
// (initialized to something other than `nil`) because it is probably a
// malicious behavior by the server.
tracker.mtx.Lock()
defer tracker.mtx.Unlock()

tracker.csumMtx.Lock()
defer tracker.csumMtx.Unlock()
if isCancel {
if tracker.cancel.csum == nil {
tracker.cancel.csum = commitChecksum
if tracker.cancelCsum == nil {
tracker.cancelCsum = commitChecksum
return true
}
return bytes.Equal(commitChecksum, tracker.cancel.csum)
return bytes.Equal(commitChecksum, tracker.cancelCsum)
}
if tracker.csum == nil {
tracker.csum = commitChecksum
Expand Down Expand Up @@ -10721,25 +10746,39 @@ func (c *Core) checkEpochResolution(host string, mktID string) {
}
currentEpoch := dc.marketEpoch(mktID, time.Now())
lastEpoch := currentEpoch - 1

// Short path if we're already resolved.
dc.epochMtx.RLock()
resolvedEpoch := dc.resolvedEpoch[mktID]
dc.epochMtx.RUnlock()
if lastEpoch == resolvedEpoch {
return
}

ts, inFlights := dc.marketTrades(mktID)
for _, ord := range inFlights {
if ord.Epoch == lastEpoch {
return
}
}
for _, t := range ts {
// Is this order from the last epoch and still not booked or executed?
if t.epochIdx() == lastEpoch && t.status() == order.OrderStatusEpoch {
return
}
if t.cancel != nil && t.cancelEpochIdx() == lastEpoch {
t.mtx.RLock()
matched := t.cancel.matches.taker != nil
t.mtx.RUnlock()
if !matched {
return
}
// Does this order have an in-flight cancel order that is not yet
// resolved?
t.mtx.RLock()
unresolvedCancel := t.cancel != nil && t.cancelEpochIdx() == lastEpoch && t.cancel.matches.taker == nil
t.mtx.RUnlock()
if unresolvedCancel {
return
}
}

// We don't have any unresolved orders or cancel orders from the last epoch.
// Just make sure that not other thread has resolved the epoch and then send
// the notification.
dc.epochMtx.Lock()
sendUpdate := lastEpoch > dc.resolvedEpoch[mktID]
dc.resolvedEpoch[mktID] = lastEpoch
Expand Down
Loading
Loading