Skip to content

Commit

Permalink
Merge pull request #395 from zenhack/handleReturn-locking
Browse files Browse the repository at this point in the history
Clean up/fix locking in handleReturn
  • Loading branch information
zenhack committed Dec 21, 2022
2 parents 6b08920 + eeb7862 commit 73ff1cb
Showing 1 changed file with 48 additions and 56 deletions.
104 changes: 48 additions & 56 deletions rpc/rpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -978,71 +978,63 @@ func (c *Conn) handleReturn(ctx context.Context, ret rpccp.Return, release capnp
c.er.ReportError(rpcerr.Annotate(pr.err, "incoming return"))
}

if q.bootstrapPromise == nil && pr.err == nil {
// The result of the message contains actual data (not just a
// client or an error), so we save the ReleaseFunc for later:
q.release = release
}
c.lk.Unlock()
// We're going to potentially block fulfilling some promises so fork
// off a goroutine to avoid blocking the receive loop.
//
// TODO(cleanup): This is a bit weird in that we hold the lock across
// the go statement, and do the unlock in the new goroutine, but before
// we actually block. This was less weird when the go statement wasn't
// there, and we should rework this so it's easier to understand what's
// going on.
go func() {
switch {
case q.bootstrapPromise != nil:
syncutil.Without(&c.lk, func() {
q.p.Resolve(pr.result, pr.err)
q.bootstrapPromise.Fulfill(q.p.Answer().Client())
q.p.ReleaseClients()
release()
})
case pr.err != nil:
// TODO(someday): send unimplemented message back to remote if
// pr.unimplemented == true.
syncutil.Without(&c.lk, func() {
q.p.Reject(pr.err)
release()
})
default:
q.release = release
syncutil.Without(&c.lk, func() {
q.p.Fulfill(pr.result)
})
q.p.Resolve(pr.result, pr.err)
if q.bootstrapPromise != nil {
q.bootstrapPromise.Fulfill(q.p.Answer().Client())
q.p.ReleaseClients()
// We can release now; root pointer of the result is a client, so the
// message won't be accessed:
release()
} else if pr.err != nil {
// We can release now; the result is an error, so data from the message
// won't be accessed:
release()
}
c.lk.Unlock()

// Send disembargoes. Failing to send one of these just never lifts
// the embargo on our side, but doesn't cause a leak.
//
// TODO(soon): make embargo resolve to error client.
for _, s := range pr.disembargoes {
c.sendMessage(ctx, s.buildDisembargo, func(err error) {
syncutil.With(&c.lk, func() {
// Send disembargoes. Failing to send one of these just never lifts
// the embargo on our side, but doesn't cause a leak.
//
// TODO(soon): make embargo resolve to error client.
for _, s := range pr.disembargoes {
c.sendMessage(ctx, s.buildDisembargo, func(err error) {
if err != nil {
err = fmt.Errorf("incoming return: send disembargo: %w", err)
c.er.ReportError(err)
}
})
}

// Send finish.
c.sendMessage(ctx, func(m rpccp.Message) error {
fin, err := m.NewFinish()
if err == nil {
fin.SetQuestionId(uint32(qid))
fin.SetReleaseResultCaps(false)
}
return err
}, func(err error) {
c.lk.Lock()
defer c.lk.Unlock()
defer close(q.finishMsgSend)

if err != nil {
err = fmt.Errorf("incoming return: send disembargo: %w", err)
err = fmt.Errorf("incoming return: send finish: build message: %w", err)
c.er.ReportError(err)
} else {
q.flags |= finishSent
c.lk.questionID.remove(uint32(qid))
}
})
}

// Send finish.
c.sendMessage(ctx, func(m rpccp.Message) error {
fin, err := m.NewFinish()
if err == nil {
fin.SetQuestionId(uint32(qid))
fin.SetReleaseResultCaps(false)
}
return err
}, func(err error) {
c.lk.Lock()
defer c.lk.Unlock()
defer close(q.finishMsgSend)

if err != nil {
err = fmt.Errorf("incoming return: send finish: build message: %w", err)
c.er.ReportError(err)
} else {
q.flags |= finishSent
c.lk.questionID.remove(uint32(qid))
}
})
}()

Expand Down

0 comments on commit 73ff1cb

Please sign in to comment.