Skip to content

Commit

Permalink
Merge pull request #393 from zenhack/releaseList-fixups
Browse files Browse the repository at this point in the history
Release list fixes & broader use
  • Loading branch information
lthibault authored Dec 17, 2022
2 parents 46a3777 + 0358e8f commit 6b08920
Show file tree
Hide file tree
Showing 3 changed files with 14 additions and 22 deletions.
8 changes: 3 additions & 5 deletions rpc/answer.go
Original file line number Diff line number Diff line change
Expand Up @@ -183,17 +183,17 @@ func (ans *answer) setBootstrap(c capnp.Client) error {
//
// The caller MUST NOT hold ans.c.lk.
func (ans *answer) Return(e error) {
rl := &releaseList{}
defer rl.Release()

ans.c.lk.Lock()
if e != nil {
rl := &releaseList{}
ans.sendException(rl, e)
ans.c.lk.Unlock()
rl.Release()
ans.pcalls.Wait()
ans.c.tasks.Done() // added by handleCall
return
}
rl := &releaseList{}
if err := ans.sendReturn(rl); err != nil {
select {
case <-ans.c.bgctx.Done():
Expand All @@ -204,13 +204,11 @@ func (ans *answer) Return(e error) {
}

ans.c.lk.Unlock()
rl.Release()
ans.pcalls.Wait()
return
}
}
ans.c.lk.Unlock()
rl.Release()
ans.pcalls.Wait()
ans.c.tasks.Done() // added by handleCall
}
Expand Down
7 changes: 4 additions & 3 deletions rpc/releaselist.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,12 @@ import "capnproto.org/go/capnp/v3"

type releaseList []capnp.ReleaseFunc

func (rl releaseList) Release() {
for i, r := range rl {
func (rl *releaseList) Release() {
funcs := *rl
for i, r := range funcs {
if r != nil {
r()
rl[i] = nil
funcs[i] = nil
}
}
}
Expand Down
21 changes: 7 additions & 14 deletions rpc/rpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -658,6 +658,9 @@ func (c *Conn) handleBootstrap(ctx context.Context, id answerID) error {
}

func (c *Conn) handleCall(ctx context.Context, call rpccp.Call, releaseCall capnp.ReleaseFunc) error {
rl := &releaseList{}
defer rl.Release()

id := answerID(call.QuestionId())

// TODO(3rd-party handshake): support sending results to 3rd party vat
Expand Down Expand Up @@ -722,11 +725,9 @@ func (c *Conn) handleCall(ctx context.Context, call rpccp.Call, releaseCall capn
c.lk.answers[id] = ans
if parseErr != nil {
parseErr = rpcerr.Annotate(parseErr, "incoming call")
rl := &releaseList{}
ans.sendException(rl, parseErr)
c.lk.Unlock()
c.er.ReportError(parseErr)
rl.Release()
releaseCall()
return nil
}
Expand Down Expand Up @@ -778,10 +779,8 @@ func (c *Conn) handleCall(ctx context.Context, call rpccp.Call, releaseCall capn
}
if tgtAns.flags.Contains(resultsReady) {
if tgtAns.err != nil {
rl := &releaseList{}
ans.sendException(rl, tgtAns.err)
c.lk.Unlock()
rl.Release()
releaseCall()
return nil
}
Expand All @@ -792,21 +791,17 @@ func (c *Conn) handleCall(ctx context.Context, call rpccp.Call, releaseCall capn
content, err := tgtAns.results.Content()
if err != nil {
err = rpcerr.Failedf("incoming call: read results from target answer: %w", err)
rl := &releaseList{}
ans.sendException(rl, err)
c.lk.Unlock()
rl.Release()
releaseCall()
c.er.ReportError(err)
return nil
}
sub, err := capnp.Transform(content, p.target.transform)
if err != nil {
// Not reporting, as this is the caller's fault.
rl := &releaseList{}
ans.sendException(rl, err)
c.lk.Unlock()
rl.Release()
releaseCall()
return nil
}
Expand Down Expand Up @@ -1116,14 +1111,16 @@ type parsedReturn struct {
}

func (c *Conn) handleFinish(ctx context.Context, id answerID, releaseResultCaps bool) error {
rl := &releaseList{}
defer rl.Release()
c.lk.Lock()
defer c.lk.Unlock()

ans := c.lk.answers[id]
if ans == nil {
c.lk.Unlock()
return rpcerr.Failedf("incoming finish: unknown answer ID %d", id)
}
if ans.flags.Contains(finishReceived) {
c.lk.Unlock()
return rpcerr.Failedf("incoming finish: answer ID %d already received finish", id)
}
ans.flags |= finishReceived
Expand All @@ -1134,15 +1131,11 @@ func (c *Conn) handleFinish(ctx context.Context, id answerID, releaseResultCaps
ans.cancel()
}
if !ans.flags.Contains(returnSent) {
c.lk.Unlock()
return nil
}

// Return sent and finish received: time to destroy answer.
rl := &releaseList{}
err := ans.destroy(rl)
c.lk.Unlock()
rl.Release()
if err != nil {
return rpcerr.Annotate(err, "incoming finish: release result caps")
}
Expand Down

0 comments on commit 6b08920

Please sign in to comment.