Skip to content

Commit

Permalink
Merge pull request #434 from zenhack/fix-375
Browse files Browse the repository at this point in the history
Fix #375 (I think)
  • Loading branch information
zenhack authored Jan 19, 2023
2 parents 54d41bf + 5ce76b8 commit 69ea88a
Show file tree
Hide file tree
Showing 2 changed files with 61 additions and 18 deletions.
17 changes: 8 additions & 9 deletions rpc/answer.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import (
"capnproto.org/go/capnp/v3"
"capnproto.org/go/capnp/v3/exc"
"capnproto.org/go/capnp/v3/internal/rc"
"capnproto.org/go/capnp/v3/internal/syncutil"
rpccp "capnproto.org/go/capnp/v3/std/capnp/rpc"
)

Expand Down Expand Up @@ -129,16 +128,16 @@ func (c *Conn) newReturn() (_ rpccp.Return, sendMsg func(), _ *rc.Releaser, _ er
}

// setPipelineCaller sets ans.pcall to pcall if the answer has not
// already returned. The caller MUST NOT hold ans.c.lk.
// already returned. The caller MUST hold ans.c.lk.
//
// This also sets ans.promise to a new promise, wrapping pcall.
func (ans *answer) setPipelineCaller(m capnp.Method, pcall capnp.PipelineCaller) {
syncutil.With(&ans.c.lk, func() {
if !ans.flags.Contains(resultsReady) {
ans.pcall = pcall
ans.promise = capnp.NewPromise(m, pcall)
}
})
func (ans *answer) setPipelineCaller(c *lockedConn, m capnp.Method, pcall capnp.PipelineCaller) {
c.assertIs(ans.c)

if !ans.flags.Contains(resultsReady) {
ans.pcall = pcall
ans.promise = capnp.NewPromise(m, pcall)
}
}

// AllocResults allocates the results struct.
Expand Down
62 changes: 53 additions & 9 deletions rpc/rpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -846,12 +846,10 @@ func (c *Conn) handleCall(ctx context.Context, call rpccp.Call, releaseCall capn
c.tasks.Add(1) // will be finished by answer.Return
var callCtx context.Context
callCtx, ans.cancel = context.WithCancel(c.bgctx)
pcall := newPromisedPipelineCaller()
ans.setPipelineCaller(c, p.method, pcall)
rl.Add(func() {
pcall := ent.client.RecvCall(callCtx, recv)
// Place PipelineCaller into answer. Since the receive goroutine is
// the only one that uses answer.pcall, it's fine that there's a
// time gap for this being set.
ans.setPipelineCaller(p.method, pcall)
pcall.resolve(ent.client.RecvCall(callCtx, recv))
})
return nil
case rpccp.MessageTarget_Which_promisedAnswer:
Expand Down Expand Up @@ -908,9 +906,10 @@ func (c *Conn) handleCall(ctx context.Context, call rpccp.Call, releaseCall capn
c.tasks.Add(1) // will be finished by answer.Return
var callCtx context.Context
callCtx, ans.cancel = context.WithCancel(c.bgctx)
pcall := newPromisedPipelineCaller()
ans.setPipelineCaller(c, p.method, pcall)
rl.Add(func() {
pcall := tgt.RecvCall(callCtx, recv)
ans.setPipelineCaller(p.method, pcall)
pcall.resolve(tgt.RecvCall(callCtx, recv))
})
} else {
// Results not ready, use pipeline caller.
Expand All @@ -919,10 +918,11 @@ func (c *Conn) handleCall(ctx context.Context, call rpccp.Call, releaseCall capn
callCtx, ans.cancel = context.WithCancel(c.bgctx)
tgt := tgtAns.pcall
c.tasks.Add(1) // will be finished by answer.Return
pcall := newPromisedPipelineCaller()
ans.setPipelineCaller(c, p.method, pcall)
rl.Add(func() {
pcall := tgt.PipelineRecv(callCtx, p.target.transform, recv)
pcall.resolve(tgt.PipelineRecv(callCtx, p.target.transform, recv))
tgtAns.pcalls.Done()
ans.setPipelineCaller(p.method, pcall)
})
}
return nil
Expand All @@ -932,6 +932,50 @@ func (c *Conn) handleCall(ctx context.Context, call rpccp.Call, releaseCall capn
})
}

// A promisedPipelineCaller is a PipelineCaller that stands in for another
// PipelineCaller that may not be available yet. Methods block until
// resolve() is called.
//
// NOTE WELL: This is meant to stand-in for a very short time, to avoid racy
// behavior between releasing locks and calling resolve(), so even the
// context on the recv/send methods is ignored until underlying caller is
// reserved.
type promisedPipelineCaller struct {
ready chan struct{}
underlying capnp.PipelineCaller
}

func newPromisedPipelineCaller() *promisedPipelineCaller {
return &promisedPipelineCaller{
ready: make(chan struct{}),
underlying: nil,
}
}

// resolve() resolves p to result.
func (p *promisedPipelineCaller) resolve(result capnp.PipelineCaller) {
p.underlying = result
close(p.ready)
}

func (p *promisedPipelineCaller) PipelineSend(
ctx context.Context,
transform []capnp.PipelineOp,
s capnp.Send,
) (*capnp.Answer, capnp.ReleaseFunc) {
<-p.ready
return p.underlying.PipelineSend(ctx, transform, s)
}

func (p *promisedPipelineCaller) PipelineRecv(
ctx context.Context,
transform []capnp.PipelineOp,
r capnp.Recv,
) capnp.PipelineCaller {
<-p.ready
return p.underlying.PipelineRecv(ctx, transform, r)
}

type parsedCall struct {
target parsedMessageTarget
method capnp.Method
Expand Down

0 comments on commit 69ea88a

Please sign in to comment.