Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Clean up/fix locking in handleDisembargo #396

Merged
merged 1 commit into from
Dec 22, 2022
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
76 changes: 36 additions & 40 deletions rpc/rpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -1389,64 +1389,60 @@ func (c *Conn) handleDisembargo(ctx context.Context, d rpccp.Disembargo, release
return
}

client := iface.Client()

var ok bool
syncutil.Without(&c.lk, func() {
imp, ok = client.State().Brand.Value.(*importClient)
})

if !ok || imp.c != c {
client.Release()
err = rpcerr.Failedf("incoming disembargo: sender loopback requested on a capability that is not an import")
return
}

// TODO(maybe): check generation?
client = iface.Client()
})

if err != nil {
release()
return err
}

imp, ok := client.State().Brand.Value.(*importClient)
if !ok || imp.c != c {
client.Release()
return rpcerr.Failedf("incoming disembargo: sender loopback requested on a capability that is not an import")
}
// TODO(maybe): check generation?

// Since this Cap'n Proto RPC implementation does not send imports
// unless they are fully dequeued, we can just immediately loop back.
id := d.Context().SenderLoopback()
c.sendMessage(ctx, func(m rpccp.Message) error {
defer release()
defer client.Release()

d, err := m.NewDisembargo()
if err != nil {
return err
}
syncutil.With(&c.lk, func() {
c.sendMessage(ctx, func(m rpccp.Message) error {
d, err := m.NewDisembargo()
if err != nil {
return err
}

tgt, err := d.NewTarget()
if err != nil {
return err
}
tgt, err := d.NewTarget()
if err != nil {
return err
}

tgt.SetImportedCap(uint32(imp.id))
d.Context().SetReceiverLoopback(id)
return nil
tgt.SetImportedCap(uint32(imp.id))
d.Context().SetReceiverLoopback(id)
return nil

}, func(err error) {
c.er.ReportError(rpcerr.Annotatef(err, "incoming disembargo: send receiver loopback"))
}, func(err error) {
defer release()
defer client.Release()
c.er.ReportError(rpcerr.Annotatef(err, "incoming disembargo: send receiver loopback"))
})
})

default:
c.er.ReportError(fmt.Errorf("incoming disembargo: context %v not implemented", d.Context().Which()))
c.sendMessage(ctx, func(m rpccp.Message) (err error) {
defer release()

if m, err = m.NewUnimplemented(); err == nil {
err = m.SetDisembargo(d)
}
syncutil.With(&c.lk, func() {
c.sendMessage(ctx, func(m rpccp.Message) (err error) {
if m, err = m.NewUnimplemented(); err == nil {
err = m.SetDisembargo(d)
}

return
}, func(err error) {
c.er.ReportError(rpcerr.Annotate(err, "incoming disembargo: send unimplemented"))
return
}, func(err error) {
defer release()
c.er.ReportError(rpcerr.Annotate(err, "incoming disembargo: send unimplemented"))
})
})
}

Expand Down