Skip to content

Commit

Permalink
releaseExport: take a deferred.Queue
Browse files Browse the repository at this point in the history
No functional change, but this pushes the details of cleanup into
releaseExport, which will make the details of easier to modify.
  • Loading branch information
zenhack committed Jun 30, 2023
1 parent 5c0a14e commit e863db4
Show file tree
Hide file tree
Showing 2 changed files with 25 additions and 33 deletions.
39 changes: 16 additions & 23 deletions rpc/export.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,13 +58,12 @@ func (c *lockedConn) findExport(id exportID) *expent {

// releaseExport decreases the number of wire references to an export
// by a given number. If the export's reference count reaches zero,
// then releaseExport will pop export from the table and return the
// export's ClientSnapshot. The caller is responsible for releasing
// the snapshot once the caller is no longer holding onto c.mu.
func (c *lockedConn) releaseExport(id exportID, count uint32) (capnp.ClientSnapshot, error) {
// then releaseExport will pop export from the table and schedule further
// cleanup (such as releasing snaphost) via dq.
func (c *lockedConn) releaseExport(dq *deferred.Queue, id exportID, count uint32) error {
ent := c.findExport(id)
if ent == nil {
return capnp.ClientSnapshot{}, rpcerr.Failed(errors.New("unknown export ID " + str.Utod(id)))
return rpcerr.Failed(errors.New("unknown export ID " + str.Utod(id)))
}
switch {
case count == ent.wireRefs:
Expand All @@ -78,29 +77,23 @@ func (c *lockedConn) releaseExport(id exportID, count uint32) (capnp.ClientSnaps
c.clearExportID(metadata)
})
}
return snapshot, nil
dq.Defer(snapshot.Release)
return nil
case count > ent.wireRefs:
return capnp.ClientSnapshot{}, rpcerr.Failed(errors.New("export ID " + str.Utod(id) + " released too many references"))
return rpcerr.Failed(errors.New("export ID " + str.Utod(id) + " released too many references"))
default:
ent.wireRefs -= count
return capnp.ClientSnapshot{}, nil
return nil
}
}

func (c *lockedConn) releaseExportRefs(dq *deferred.Queue, refs map[exportID]uint32) error {
var firstErr error
for id, count := range refs {
snapshot, err := c.releaseExport(id, count)
if err != nil {
if firstErr == nil {
firstErr = err
}
continue
}
if (snapshot == capnp.ClientSnapshot{}) {
continue
err := c.releaseExport(dq, id, count)
if err != nil && firstErr == nil {
firstErr = err
}
dq.Defer(snapshot.Release)
}
return firstErr
}
Expand Down Expand Up @@ -236,19 +229,19 @@ func (c *lockedConn) sendSenderPromise(id exportID, d rpccp.CapDescriptor) {
}, func(err error) {
sendRef.Release()
if err != nil && isExport {
dq := &deferred.Queue{}
defer dq.Run()
// release 1 ref of the thing it resolved to.
snapshot, err := withLockedConn2(
err := withLockedConn1(
unlockedConn,
func(c *lockedConn) (capnp.ClientSnapshot, error) {
return c.releaseExport(resolvedID, 1)
func(c *lockedConn) error {
return c.releaseExport(dq, resolvedID, 1)
},
)
if err != nil {
c.er.ReportError(
exc.WrapError("releasing export due to failure to send resolve", err),
)
} else {
snapshot.Release()
}
}
})
Expand Down
19 changes: 9 additions & 10 deletions rpc/rpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -669,14 +669,12 @@ func (c *Conn) handleUnimplemented(in transport.IncomingMessage) error {
default:
return nil
}
snapshot, err := withLockedConn2(c, func(c *lockedConn) (capnp.ClientSnapshot, error) {
return c.releaseExport(id, 1)
dq := &deferred.Queue{}
defer dq.Run()
err = withLockedConn1(c, func(c *lockedConn) error {
return c.releaseExport(dq, id, 1)
})
if err != nil {
return err
}
snapshot.Release()
return nil
return err
}
}
// For other cases we should just ignore the message.
Expand Down Expand Up @@ -1532,13 +1530,14 @@ func (c *Conn) handleRelease(ctx context.Context, in transport.IncomingMessage)
id := exportID(rel.Id())
count := rel.ReferenceCount()

snapshot, err := withLockedConn2(c, func(c *lockedConn) (capnp.ClientSnapshot, error) {
return c.releaseExport(id, count)
dq := &deferred.Queue{}
defer dq.Run()
err = withLockedConn1(c, func(c *lockedConn) error {
return c.releaseExport(dq, id, count)
})
if err != nil {
return rpcerr.Annotate(err, "incoming release")
}
snapshot.Release() // no-ops for nil
return nil
}

Expand Down

0 comments on commit e863db4

Please sign in to comment.