Skip to content

Commit

Permalink
Correctly send receiverLoopbacks that target promisedAnswers.
Browse files Browse the repository at this point in the history
Worryingly, this was manifesting in the test as a deadlock: we hit the
error complaining about it not being an import, but then connection
shutdown hung, waiting on tasks. I haven't pinned down exactly what was
going on there, but this sidesteps the issue by fixing the thing that
was causing a connection abort in the first place.
  • Loading branch information
zenhack committed Mar 23, 2023
1 parent 9a40e87 commit e46bf47
Showing 1 changed file with 30 additions and 13 deletions.
43 changes: 30 additions & 13 deletions rpc/rpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -1639,33 +1639,50 @@ func (c *Conn) handleDisembargo(ctx context.Context, in transport.IncomingMessag
return exc.WrapError("incoming disembargo", err)
}

imp, ok := client.State().Brand.Value.(*importClient)
if !ok || imp.c != c {
client.Release()
return rpcerr.Failed(errors.New(
"incoming disembargo: sender loopback requested on a capability that is not an import",
))
}
// TODO(maybe): check generation?

// Since this Cap'n Proto RPC implementation does not send imports
// unless they are fully dequeued, we can just immediately loop back.
id := d.Context().SenderLoopback()

c.withLocked(func(c *lockedConn) {
c.sendMessage(ctx, func(m rpccp.Message) error {
d, err := m.NewDisembargo()
if err != nil {
return err
}

d.Context().SetReceiverLoopback(id)
tgt, err := d.NewTarget()
if err != nil {
return err
}

tgt.SetImportedCap(uint32(imp.id))
d.Context().SetReceiverLoopback(id)
return nil
brand := client.State().Brand
if pc, ok := brand.Value.(capnp.PipelineClient); ok {
if q, ok := c.getAnswerQuestion(pc.Answer()); ok {
if q.c == (*Conn)(c) {
pa, err := tgt.NewPromisedAnswer()
if err != nil {
return err
}
pa.SetQuestionId(uint32(q.id))
pcTrans := pc.Transform()
trans, err := pa.NewTransform(int32(len(pcTrans)))
if err != nil {
return err
}
for i, op := range pcTrans {
trans.At(i).SetGetPointerField(op.Field)
}
}
return nil
}
}

imp, ok := brand.Value.(*importClient)
if ok && imp.c == (*Conn)(c) {
tgt.SetImportedCap(uint32(imp.id))
return nil
}
return errors.New("target for receiver loopback does not point to the right connection")

}, func(err error) {
defer in.Release()
Expand Down

0 comments on commit e46bf47

Please sign in to comment.