Skip to content

Commit

Permalink
Merge pull request #396 from zenhack/handleDisembargo-locking
Browse files Browse the repository at this point in the history
Clean up/fix locking in handleDisembargo
  • Loading branch information
zenhack committed Dec 22, 2022
2 parents 73ff1cb + 182cf37 commit 34de47d
Showing 1 changed file with 36 additions and 40 deletions.
76 changes: 36 additions & 40 deletions rpc/rpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -1389,64 +1389,60 @@ func (c *Conn) handleDisembargo(ctx context.Context, d rpccp.Disembargo, release
return
}

client := iface.Client()

var ok bool
syncutil.Without(&c.lk, func() {
imp, ok = client.State().Brand.Value.(*importClient)
})

if !ok || imp.c != c {
client.Release()
err = rpcerr.Failedf("incoming disembargo: sender loopback requested on a capability that is not an import")
return
}

// TODO(maybe): check generation?
client = iface.Client()
})

if err != nil {
release()
return err
}

imp, ok := client.State().Brand.Value.(*importClient)
if !ok || imp.c != c {
client.Release()
return rpcerr.Failedf("incoming disembargo: sender loopback requested on a capability that is not an import")
}
// TODO(maybe): check generation?

// Since this Cap'n Proto RPC implementation does not send imports
// unless they are fully dequeued, we can just immediately loop back.
id := d.Context().SenderLoopback()
c.sendMessage(ctx, func(m rpccp.Message) error {
defer release()
defer client.Release()

d, err := m.NewDisembargo()
if err != nil {
return err
}
syncutil.With(&c.lk, func() {
c.sendMessage(ctx, func(m rpccp.Message) error {
d, err := m.NewDisembargo()
if err != nil {
return err
}

tgt, err := d.NewTarget()
if err != nil {
return err
}
tgt, err := d.NewTarget()
if err != nil {
return err
}

tgt.SetImportedCap(uint32(imp.id))
d.Context().SetReceiverLoopback(id)
return nil
tgt.SetImportedCap(uint32(imp.id))
d.Context().SetReceiverLoopback(id)
return nil

}, func(err error) {
c.er.ReportError(rpcerr.Annotatef(err, "incoming disembargo: send receiver loopback"))
}, func(err error) {
defer release()
defer client.Release()
c.er.ReportError(rpcerr.Annotatef(err, "incoming disembargo: send receiver loopback"))
})
})

default:
c.er.ReportError(fmt.Errorf("incoming disembargo: context %v not implemented", d.Context().Which()))
c.sendMessage(ctx, func(m rpccp.Message) (err error) {
defer release()

if m, err = m.NewUnimplemented(); err == nil {
err = m.SetDisembargo(d)
}
syncutil.With(&c.lk, func() {
c.sendMessage(ctx, func(m rpccp.Message) (err error) {
if m, err = m.NewUnimplemented(); err == nil {
err = m.SetDisembargo(d)
}

return
}, func(err error) {
c.er.ReportError(rpcerr.Annotate(err, "incoming disembargo: send unimplemented"))
return
}, func(err error) {
defer release()
c.er.ReportError(rpcerr.Annotate(err, "incoming disembargo: send unimplemented"))
})
})
}

Expand Down

0 comments on commit 34de47d

Please sign in to comment.