Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Handle incoming resolve messages. #480

Closed
wants to merge 36 commits into from
Closed
Show file tree
Hide file tree
Changes from 11 commits
Commits
Show all changes
36 commits
Select commit Hold shift + click to select a range
a3ffc4e
WIP: handle incoming resolve messages.
zenhack Mar 16, 2023
36b13bf
cleanup: remove unnecessary declaration of importClient.
zenhack Mar 18, 2023
3cbff97
Factor some logic out of handleDisembargo
zenhack Mar 18, 2023
9fbd8c6
First stab at disembargos on imports.
zenhack Mar 18, 2023
2061cfa
Add a test for disembargos on senderPromises.
zenhack Mar 19, 2023
d50b6ce
Add another test wrt promise resolution.
zenhack Mar 19, 2023
b11ac49
Fix the ownership semantics for NewLocalPromise
zenhack Mar 21, 2023
a1028be
Remove no longer necessary .Release()
zenhack Mar 21, 2023
0799051
Merge remote-tracking branch 'origin/main' into handle-resolve
zenhack Mar 22, 2023
9a40e87
Minor style cleanup
zenhack Mar 23, 2023
e46bf47
Correctly send receiverLoopbacks that target promisedAnswers.
zenhack Mar 23, 2023
14b7864
Merge remote-tracking branch 'origin/main' into handle-resolve
zenhack Mar 24, 2023
49578d5
Merge remote-tracking branch 'origin/main' into handle-resolve
zenhack Mar 30, 2023
3a32a77
Fix build errors due to bitrot.
zenhack Mar 30, 2023
c78b076
Clean up the way local promises work.
zenhack Mar 30, 2023
d7861ab
Merge branch 'localpromise-fixes' into handle-resolve
zenhack Mar 30, 2023
2e138f0
WIP: put a ClientSnapshot in the exports table
zenhack May 25, 2023
9999902
ClientSnapshot.Release(): take pointer receiver.
zenhack May 26, 2023
7213fc1
Do leak reporting for ClientSnapshots, not just Clients.
zenhack May 26, 2023
1cc3130
Merge remote-tracking branch 'origin/main' into handle-resolve
zenhack May 27, 2023
8d71f0b
Merge branch 'export-snapshot' into handle-resolve
zenhack May 27, 2023
91d062a
CapTable: rename Get -> GetClient, Add -> AddClient
zenhack May 27, 2023
5e0023a
CapTable: Add *Snapshot variants of Add & Get
zenhack May 27, 2023
abf193b
Separate client/snapshot versions of CapTable.At()
zenhack May 27, 2023
76f95d5
Remove argument to CapTable.Reset()
zenhack May 27, 2023
61ca9e5
CapTable.Set: split into client/snapshot variants
zenhack May 27, 2023
f967703
CapTable: change internal table to store snapshots.
zenhack May 27, 2023
fe08d87
WIP: Add a Steal() method to Client and ClientSnapshot
zenhack May 27, 2023
afeb534
Add missing call to .AddRef()
zenhack May 27, 2023
7a23f9b
pogs tests: clone input before inserting.
zenhack May 27, 2023
2372b53
Mark TestDuplicateBootstrap as flaky.
zenhack May 27, 2023
1acb315
Merge branch 'client.Steal' into captable-snapshots
zenhack May 27, 2023
843e8b6
Merge branch '523-flaky' into captable-snapshots
zenhack May 27, 2023
575916e
CapTable: maintain snapshots & clients in parallel.
zenhack May 27, 2023
75200fc
Merge branch 'captable-snapshots' into handle-resolve
zenhack May 27, 2023
0a99d60
Fix build errors.
zenhack May 28, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 27 additions & 16 deletions localpromise.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,15 @@ import (
)

// ClientHook for a promise that will be resolved to some other capability
// at some point. Buffers calls in a queue until the promsie is fulfilled,
// at some point. Buffers calls in a queue until the promise is fulfilled,
// then forwards them.
type localPromise struct {
aq *AnswerQueue
}

// NewLocalPromise returns a client that will eventually resolve to a capability,
// supplied via the fulfiller.
func NewLocalPromise[C ~ClientKind]() (C, Resolver[C]) {
// supplied via resolver. resolver.Fulfill steals the reference to its argument.
func NewLocalPromise[C ~ClientKind]() (promise C, resolver Resolver[C]) {
lp := newLocalPromise()
p, f := NewPromisedClient(lp)
return C(p), localResolver[C]{
Expand Down Expand Up @@ -44,27 +44,38 @@ func (lp localPromise) String() string {
return "localPromise{...}"
}

func (lp localPromise) Fulfill(c Client) {
msg, seg := NewSingleSegmentMessage(nil)
capID := msg.AddCap(c)
lp.aq.Fulfill(NewInterface(seg, capID).ToPtr())
}

func (lp localPromise) Reject(err error) {
lp.aq.Reject(err)
}

type localResolver[C ~ClientKind] struct {
lp localPromise
clientResolver Resolver[Client]
}

func (lf localResolver[C]) Fulfill(c C) {
lf.lp.Fulfill(Client(c))
lf.clientResolver.Fulfill(Client(c))
// This is convoluted, for a few reasons:
//
// 1. AnswerQueue wants a Ptr, not a Client, so we have to construct
// a message for this.
// 2. Message.AddCap steals the reference, so we have to get the client
// back from the message instead of using the reference we already
// have.
// 3. The semantics of NewPromisedClient differ from what we want, and
// are kindof odd: when it is resolved it does not steal the reference,
// nor does it borrow it -- instead it merges the refcounts, so that
// the two clients point to the same place. So we have to drop our
// reference to get the semantics we want.
//
// TODO: We should probably push this part down into the implementation of
// clientPromise. That requires auditing its uses and adjusting call sites
// though.
msg, seg := NewSingleSegmentMessage(nil)
capID := msg.AddCap(Client(c))
iface := NewInterface(seg, capID)
client := iface.Client()
defer client.Release()
lf.lp.aq.Fulfill(iface.ToPtr())
lf.clientResolver.Fulfill(client)
}

func (lf localResolver[C]) Reject(err error) {
lf.lp.Reject(err)
lf.lp.aq.Reject(err)
lf.clientResolver.Reject(err)
}
44 changes: 26 additions & 18 deletions rpc/export.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,9 +74,11 @@ func (c *lockedConn) releaseExport(id exportID, count uint32) (capnp.Client, err
c.lk.exports[id] = nil
c.lk.exportID.remove(uint32(id))
metadata := client.State().Metadata
syncutil.With(metadata, func() {
c.clearExportID(metadata)
})
if metadata != nil {
syncutil.With(metadata, func() {
c.clearExportID(metadata)
})
}
return client, nil
case count > ent.wireRefs:
return capnp.Client{}, rpcerr.Failed(errors.New("export ID " + str.Utod(id) + " released too many references"))
Expand Down Expand Up @@ -367,33 +369,39 @@ func (e *embargo) Shutdown() {
// senderLoopback holds the salient information for a sender-loopback
// Disembargo message.
type senderLoopback struct {
id embargoID
question questionID
transform []capnp.PipelineOp
id embargoID
target parsedMessageTarget
}

func (sl *senderLoopback) buildDisembargo(msg rpccp.Message) error {
d, err := msg.NewDisembargo()
if err != nil {
return rpcerr.WrapFailed("build disembargo", err)
}
d.Context().SetSenderLoopback(uint32(sl.id))
tgt, err := d.NewTarget()
if err != nil {
return rpcerr.WrapFailed("build disembargo", err)
}
pa, err := tgt.NewPromisedAnswer()
if err != nil {
return rpcerr.WrapFailed("build disembargo", err)
}
oplist, err := pa.NewTransform(int32(len(sl.transform)))
if err != nil {
return rpcerr.WrapFailed("build disembargo", err)
}
switch sl.target.which {
case rpccp.MessageTarget_Which_promisedAnswer:
pa, err := tgt.NewPromisedAnswer()
if err != nil {
return rpcerr.WrapFailed("build disembargo", err)
}
oplist, err := pa.NewTransform(int32(len(sl.target.transform)))
if err != nil {
return rpcerr.WrapFailed("build disembargo", err)
}

d.Context().SetSenderLoopback(uint32(sl.id))
pa.SetQuestionId(uint32(sl.question))
for i, op := range sl.transform {
oplist.At(i).SetGetPointerField(op.Field)
pa.SetQuestionId(uint32(sl.target.promisedAnswer))
for i, op := range sl.target.transform {
oplist.At(i).SetGetPointerField(op.Field)
}
case rpccp.MessageTarget_Which_importedCap:
tgt.SetImportedCap(uint32(sl.target.importedCap))
default:
return errors.New("unknown variant for MessageTarget: " + str.Utod(sl.target.which))
}
return nil
}
21 changes: 18 additions & 3 deletions rpc/import.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,14 +45,19 @@ type impent struct {
// importClient's generation matches the entry's generation before
// removing the entry from the table and sending a release message.
generation uint64

// If resolver is non-nil, then this is a promise (received as
// CapDescriptor_Which_senderPromise), and when a resolve message
// arrives we should use this to fulfill the promise locally.
resolver capnp.Resolver[capnp.Client]
}

// addImport returns a client that represents the given import,
// incrementing the number of references to this import from this vat.
// This is separate from the reference counting that capnp.Client does.
//
// The caller must be holding onto c.mu.
func (c *lockedConn) addImport(id importID) capnp.Client {
func (c *lockedConn) addImport(id importID, isPromise bool) capnp.Client {
if ent := c.lk.imports[id]; ent != nil {
ent.wireRefs++
client, ok := ent.wc.AddRef()
Expand All @@ -67,13 +72,23 @@ func (c *lockedConn) addImport(id importID) capnp.Client {
}
return client
}
client := capnp.NewClient(&importClient{
hook := &importClient{
c: (*Conn)(c),
id: id,
})
}
var (
client capnp.Client
resolver capnp.Resolver[capnp.Client]
)
if isPromise {
client, resolver = capnp.NewPromisedClient(hook)
} else {
client = capnp.NewClient(hook)
}
c.lk.imports[id] = &impent{
wc: client.WeakRef(),
wireRefs: 1,
resolver: resolver,
}
return client
}
Expand Down
42 changes: 26 additions & 16 deletions rpc/localpromise_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,12 +45,10 @@ func TestLocalPromiseFulfill(t *testing.T) {
})
defer rel2()

pp := testcapnp.PingPong_ServerToClient(&echoNumOrderChecker{
r.Fulfill(testcapnp.PingPong_ServerToClient(&echoNumOrderChecker{
t: t,
nextNum: 1,
})
defer pp.Release()
r.Fulfill(pp)
}))

fut3, rel3 := p.EchoNum(ctx, func(p testcapnp.PingPong_echoNum_Params) error {
p.SetN(3)
Expand All @@ -70,31 +68,29 @@ func TestLocalPromiseFulfill(t *testing.T) {
assert.Equal(t, int64(3), res3.N())
}

func echoNum(ctx context.Context, pp testcapnp.PingPong, n int64) (testcapnp.PingPong_echoNum_Results_Future, capnp.ReleaseFunc) {
return pp.EchoNum(ctx, func(p testcapnp.PingPong_echoNum_Params) error {
p.SetN(n)
return nil
})
}

func TestLocalPromiseReject(t *testing.T) {
t.Parallel()

ctx := context.Background()
p, r := capnp.NewLocalPromise[testcapnp.PingPong]()
defer p.Release()

fut1, rel1 := p.EchoNum(ctx, func(p testcapnp.PingPong_echoNum_Params) error {
p.SetN(1)
return nil
})
fut1, rel1 := echoNum(ctx, p, 1)
defer rel1()

fut2, rel2 := p.EchoNum(ctx, func(p testcapnp.PingPong_echoNum_Params) error {
p.SetN(2)
return nil
})
fut2, rel2 := echoNum(ctx, p, 2)
defer rel2()

r.Reject(errors.New("Promise rejected"))

fut3, rel3 := p.EchoNum(ctx, func(p testcapnp.PingPong_echoNum_Params) error {
p.SetN(3)
return nil
})
fut3, rel3 := echoNum(ctx, p, 3)
defer rel3()

_, err := fut1.Struct()
Expand All @@ -106,3 +102,17 @@ func TestLocalPromiseReject(t *testing.T) {
_, err = fut3.Struct()
assert.NotNil(t, err)
}

// Test that the promise owns the capability it resolves to; no separate
// release should be necessary.
func TestLocalPromiseOwnsResult(t *testing.T) {
t.Parallel()

p, r := capnp.NewLocalPromise[testcapnp.PingPong]()
defer p.Release()

r.Fulfill(testcapnp.PingPong_ServerToClient(&echoNumOrderChecker{
t: t,
nextNum: 1,
}))
}
Loading