diff --git a/rpc/rpc.go b/rpc/rpc.go index d31fe5d3..72805baa 100644 --- a/rpc/rpc.go +++ b/rpc/rpc.go @@ -1000,39 +1000,41 @@ func (c *Conn) handleReturn(ctx context.Context, ret rpccp.Return, release capnp release() } - // Send disembargoes. Failing to send one of these just never lifts - // the embargo on our side, but doesn't cause a leak. - // - // TODO(soon): make embargo resolve to error client. - for _, s := range pr.disembargoes { - c.sendMessage(ctx, s.buildDisembargo, func(err error) { + syncutil.With(&c.lk, func() { + // Send disembargoes. Failing to send one of these just never lifts + // the embargo on our side, but doesn't cause a leak. + // + // TODO(soon): make embargo resolve to error client. + for _, s := range pr.disembargoes { + c.sendMessage(ctx, s.buildDisembargo, func(err error) { + if err != nil { + err = fmt.Errorf("incoming return: send disembargo: %w", err) + c.er.ReportError(err) + } + }) + } + + // Send finish. + c.sendMessage(ctx, func(m rpccp.Message) error { + fin, err := m.NewFinish() + if err == nil { + fin.SetQuestionId(uint32(qid)) + fin.SetReleaseResultCaps(false) + } + return err + }, func(err error) { + c.lk.Lock() + defer c.lk.Unlock() + defer close(q.finishMsgSend) + if err != nil { - err = fmt.Errorf("incoming return: send disembargo: %w", err) + err = fmt.Errorf("incoming return: send finish: build message: %w", err) c.er.ReportError(err) + } else { + q.flags |= finishSent + c.lk.questionID.remove(uint32(qid)) } }) - } - - // Send finish. - c.sendMessage(ctx, func(m rpccp.Message) error { - fin, err := m.NewFinish() - if err == nil { - fin.SetQuestionId(uint32(qid)) - fin.SetReleaseResultCaps(false) - } - return err - }, func(err error) { - c.lk.Lock() - defer c.lk.Unlock() - defer close(q.finishMsgSend) - - if err != nil { - err = fmt.Errorf("incoming return: send finish: build message: %w", err) - c.er.ReportError(err) - } else { - q.flags |= finishSent - c.lk.questionID.remove(uint32(qid)) - } }) }()