diff --git a/rpc/export.go b/rpc/export.go index ca153181..eebf3e55 100644 --- a/rpc/export.go +++ b/rpc/export.go @@ -58,13 +58,12 @@ func (c *lockedConn) findExport(id exportID) *expent { // releaseExport decreases the number of wire references to an export // by a given number. If the export's reference count reaches zero, -// then releaseExport will pop export from the table and return the -// export's ClientSnapshot. The caller is responsible for releasing -// the snapshot once the caller is no longer holding onto c.mu. -func (c *lockedConn) releaseExport(id exportID, count uint32) (capnp.ClientSnapshot, error) { +// then releaseExport will pop export from the table and schedule further +// cleanup (such as releasing snaphost) via dq. +func (c *lockedConn) releaseExport(dq *deferred.Queue, id exportID, count uint32) error { ent := c.findExport(id) if ent == nil { - return capnp.ClientSnapshot{}, rpcerr.Failed(errors.New("unknown export ID " + str.Utod(id))) + return rpcerr.Failed(errors.New("unknown export ID " + str.Utod(id))) } switch { case count == ent.wireRefs: @@ -78,29 +77,23 @@ func (c *lockedConn) releaseExport(id exportID, count uint32) (capnp.ClientSnaps c.clearExportID(metadata) }) } - return snapshot, nil + dq.Defer(snapshot.Release) + return nil case count > ent.wireRefs: - return capnp.ClientSnapshot{}, rpcerr.Failed(errors.New("export ID " + str.Utod(id) + " released too many references")) + return rpcerr.Failed(errors.New("export ID " + str.Utod(id) + " released too many references")) default: ent.wireRefs -= count - return capnp.ClientSnapshot{}, nil + return nil } } func (c *lockedConn) releaseExportRefs(dq *deferred.Queue, refs map[exportID]uint32) error { var firstErr error for id, count := range refs { - snapshot, err := c.releaseExport(id, count) - if err != nil { - if firstErr == nil { - firstErr = err - } - continue - } - if (snapshot == capnp.ClientSnapshot{}) { - continue + err := c.releaseExport(dq, id, count) + if err != nil && firstErr == nil { + firstErr = err } - dq.Defer(snapshot.Release) } return firstErr } @@ -236,19 +229,19 @@ func (c *lockedConn) sendSenderPromise(id exportID, d rpccp.CapDescriptor) { }, func(err error) { sendRef.Release() if err != nil && isExport { + dq := &deferred.Queue{} + defer dq.Run() // release 1 ref of the thing it resolved to. - snapshot, err := withLockedConn2( + err := withLockedConn1( unlockedConn, - func(c *lockedConn) (capnp.ClientSnapshot, error) { - return c.releaseExport(resolvedID, 1) + func(c *lockedConn) error { + return c.releaseExport(dq, resolvedID, 1) }, ) if err != nil { c.er.ReportError( exc.WrapError("releasing export due to failure to send resolve", err), ) - } else { - snapshot.Release() } } }) diff --git a/rpc/rpc.go b/rpc/rpc.go index 839db35a..8aeec41e 100644 --- a/rpc/rpc.go +++ b/rpc/rpc.go @@ -669,14 +669,12 @@ func (c *Conn) handleUnimplemented(in transport.IncomingMessage) error { default: return nil } - snapshot, err := withLockedConn2(c, func(c *lockedConn) (capnp.ClientSnapshot, error) { - return c.releaseExport(id, 1) + dq := &deferred.Queue{} + defer dq.Run() + err = withLockedConn1(c, func(c *lockedConn) error { + return c.releaseExport(dq, id, 1) }) - if err != nil { - return err - } - snapshot.Release() - return nil + return err } } // For other cases we should just ignore the message. @@ -1532,13 +1530,14 @@ func (c *Conn) handleRelease(ctx context.Context, in transport.IncomingMessage) id := exportID(rel.Id()) count := rel.ReferenceCount() - snapshot, err := withLockedConn2(c, func(c *lockedConn) (capnp.ClientSnapshot, error) { - return c.releaseExport(id, count) + dq := &deferred.Queue{} + defer dq.Run() + err = withLockedConn1(c, func(c *lockedConn) error { + return c.releaseExport(dq, id, count) }) if err != nil { return rpcerr.Annotate(err, "incoming release") } - snapshot.Release() // no-ops for nil return nil }