From e6b8cb6e8c7490b8b8c0c92289889f8384ea3276 Mon Sep 17 00:00:00 2001 From: Ian Denhardt Date: Sat, 11 Feb 2023 22:25:09 -0500 Subject: [PATCH 01/11] WIP: Streaming support helpers. This patch begins work on ergonomic improvements to streaming workflows. The plan is to update the code generator to use SendStreamingCall for methods whose return type is stream, and have them similarly just return streamError instead of a result, release pair. This lets the caller just invoke the method in a loop and check the error each time through (or only at the end on a done() method or the like if they don't care about short-circuiting). --- capability.go | 41 +++++++++++++++++++++++++++++++++++++---- 1 file changed, 37 insertions(+), 4 deletions(-) diff --git a/capability.go b/capability.go index 85cbf280..0e65483f 100644 --- a/capability.go +++ b/capability.go @@ -8,6 +8,7 @@ import ( "strconv" "sync" + "capnproto.org/go/capnp/v3/exc" "capnproto.org/go/capnp/v3/exp/bufferpool" "capnproto.org/go/capnp/v3/flowcontrol" "capnproto.org/go/capnp/v3/internal/str" @@ -122,10 +123,11 @@ type client struct { creatorStack string creatorLine int - mu sync.Mutex // protects the struct - limiter flowcontrol.FlowLimiter - h *clientHook // nil if resolved to nil or released - released bool + mu sync.Mutex // protects the struct + limiter flowcontrol.FlowLimiter + streamError error // Last error from streaming calls. + h *clientHook // nil if resolved to nil or released + released bool } // clientHook is a reference-counted wrapper for a ClientHook. @@ -321,6 +323,10 @@ func (c Client) SendCall(ctx context.Context, s Send) (*Answer, ReleaseFunc) { return ErrorAnswer(s.Method, errors.New("call on null client")), func() {} } + if c.streamError != nil { + return ErrorAnswer(s.Method, exc.WrapError("streamError", c.streamError)), func() {} + } + limiter := c.GetFlowLimiter() // We need to call PlaceArgs before we will know the size of message for @@ -381,6 +387,33 @@ func (c Client) SendCall(ctx context.Context, s Send) (*Answer, ReleaseFunc) { return ans, rel } +// SendStreamCall is like SendCall except that: +// +// 1. It does not return an answer for the eventual result. +// 2. If the call returns an error, all future calls on this +// client will reutrn the same error. (without starting +// the method or calling PlaceArgs). +func (c Client) SendStreamCall(ctx context.Context, s Send) error { + var streamError error + syncutil.With(&c.mu, func() { + streamError = c.streamError + }) + if streamError != nil { + return streamError + } + ans, release := c.SendCall(ctx, s) + go func() { + _, err := ans.Future().Ptr() + release() + if err != nil { + syncutil.With(&c.mu, func() { + c.streamError = err + }) + } + }() + return nil +} + // RecvCall starts executing a method with the referenced arguments // and returns an answer that will hold the result. The hook will call // a.Release when it no longer needs to reference the parameters. The From 636863dbed1607a009619e48f050db190a8fefea Mon Sep 17 00:00:00 2001 From: Ian Denhardt Date: Sat, 11 Feb 2023 23:26:17 -0500 Subject: [PATCH 02/11] Fix typos --- capability.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/capability.go b/capability.go index 0e65483f..bf834f9e 100644 --- a/capability.go +++ b/capability.go @@ -391,7 +391,7 @@ func (c Client) SendCall(ctx context.Context, s Send) (*Answer, ReleaseFunc) { // // 1. It does not return an answer for the eventual result. // 2. If the call returns an error, all future calls on this -// client will reutrn the same error. (without starting +// client will return the same error (without starting // the method or calling PlaceArgs). func (c Client) SendStreamCall(ctx context.Context, s Send) error { var streamError error From 28d70b0a99c65b912da60ec0d86366a9a1d9a9cb Mon Sep 17 00:00:00 2001 From: Ian Denhardt Date: Sat, 11 Feb 2023 23:28:04 -0500 Subject: [PATCH 03/11] Don't use camelCase in error message. No reason to copy the variable name here, which is private. --- capability.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/capability.go b/capability.go index bf834f9e..8bf20e63 100644 --- a/capability.go +++ b/capability.go @@ -324,7 +324,7 @@ func (c Client) SendCall(ctx context.Context, s Send) (*Answer, ReleaseFunc) { } if c.streamError != nil { - return ErrorAnswer(s.Method, exc.WrapError("streamError", c.streamError)), func() {} + return ErrorAnswer(s.Method, exc.WrapError("stream error", c.streamError)), func() {} } limiter := c.GetFlowLimiter() From 27820f0223dfad3bc00d17c4376583332be4d54f Mon Sep 17 00:00:00 2001 From: Ian Denhardt Date: Sat, 11 Feb 2023 23:59:32 -0500 Subject: [PATCH 04/11] Code generator: handle streaming methods. This patch special cases generated streaming methods, and uses SendStreamCall instead of SendCall. It updates tests & schema accordingly. The change to the streaming benchmark is a nice example of how this improves ergonomics, though we had to add a done() method to make it work. --- capnpc-go/nodes.go | 5 + capnpc-go/templates/interfaceClient | 13 +- .../internal/test-tool/writer.capnp.go | 19 +- internal/aircraftlib/aircraft.capnp.go | 25 +- rpc/bench_test.go | 29 +- rpc/flow_test.go | 4 + rpc/internal/testcapnp/test.capnp | 1 + rpc/internal/testcapnp/test.capnp.go | 312 +++++++++++++++--- std/capnp/persistent/persistent.capnp.go | 7 +- 9 files changed, 335 insertions(+), 80 deletions(-) diff --git a/capnpc-go/nodes.go b/capnpc-go/nodes.go index e68c4b8a..8a71a2ee 100644 --- a/capnpc-go/nodes.go +++ b/capnpc-go/nodes.go @@ -7,6 +7,7 @@ import ( "capnproto.org/go/capnp/v3" "capnproto.org/go/capnp/v3/internal/schema" + "capnproto.org/go/capnp/v3/std/capnp/stream" ) type node struct { @@ -100,6 +101,10 @@ type interfaceMethod struct { Results *node } +func (m interfaceMethod) IsStreaming() bool { + return m.Results.Id() == stream.StreamResult_TypeID +} + func methodSet(methods []interfaceMethod, n *node, nodes nodeMap) ([]interfaceMethod, error) { ms, _ := n.Interface().Methods() for i := 0; i < ms.Len(); i++ { diff --git a/capnpc-go/templates/interfaceClient b/capnpc-go/templates/interfaceClient index 8d419958..279810f6 100644 --- a/capnpc-go/templates/interfaceClient +++ b/capnpc-go/templates/interfaceClient @@ -6,7 +6,11 @@ type {{.Node.Name}} capnp.Client {{ template "_typeid" .Node }} {{range .Methods -}} -func (c {{$.Node.Name}}) {{.Name|title}}(ctx {{$.G.Imports.Context}}.Context, params func({{$.G.RemoteNodeName .Params $.Node}}) error) ({{$.G.RemoteNodeName .Results $.Node}}_Future, capnp.ReleaseFunc) { + +func (c {{$.Node.Name}}) {{.Name|title}}(ctx {{$.G.Imports.Context}}.Context, params func({{$.G.RemoteNodeName .Params $.Node}}) error) +{{- if .IsStreaming }} error { +{{- else }} ({{$.G.RemoteNodeName .Results $.Node}}_Future, capnp.ReleaseFunc) { +{{ end }} s := capnp.Send{ Method: capnp.Method{ {{template "_interfaceMethod" .}} @@ -16,9 +20,14 @@ func (c {{$.Node.Name}}) {{.Name|title}}(ctx {{$.G.Imports.Context}}.Context, pa s.ArgsSize = {{$.G.ObjectSize .Params}} s.PlaceArgs = func(s capnp.Struct) error { return params({{$.G.RemoteNodeName .Params $.Node}}(s)) } } +{{ if .IsStreaming }} + return capnp.Client(c).SendStreamCall(ctx, s) +{{ else }} ans, release := capnp.Client(c).SendCall(ctx, s) return {{$.G.RemoteNodeName .Results $.Node}}_Future{Future: ans.Future()}, release +{{ end }} } + {{end}} // String returns a string that identifies this capability for debugging @@ -86,4 +95,4 @@ func (c {{$.Node.Name}}) SetFlowLimiter(lim {{.G.Imports.FlowControl}}.FlowLimit // for this client. func (c {{$.Node.Name}}) GetFlowLimiter() {{.G.Imports.FlowControl}}.FlowLimiter { return capnp.Client(c).GetFlowLimiter() -} \ No newline at end of file +} diff --git a/flowcontrol/internal/test-tool/writer.capnp.go b/flowcontrol/internal/test-tool/writer.capnp.go index 471cad5c..56067848 100644 --- a/flowcontrol/internal/test-tool/writer.capnp.go +++ b/flowcontrol/internal/test-tool/writer.capnp.go @@ -18,6 +18,7 @@ type Writer capnp.Client const Writer_TypeID = 0xf82e58b4a78f136b func (c Writer) Write(ctx context.Context, params func(Writer_write_Params) error) (Writer_write_Results_Future, capnp.ReleaseFunc) { + s := capnp.Send{ Method: capnp.Method{ InterfaceID: 0xf82e58b4a78f136b, @@ -30,8 +31,10 @@ func (c Writer) Write(ctx context.Context, params func(Writer_write_Params) erro s.ArgsSize = capnp.ObjectSize{DataSize: 0, PointerCount: 1} s.PlaceArgs = func(s capnp.Struct) error { return params(Writer_write_Params(s)) } } + ans, release := capnp.Client(c).SendCall(ctx, s) return Writer_write_Results_Future{Future: ans.Future()}, release + } // String returns a string that identifies this capability for debugging @@ -99,7 +102,9 @@ func (c Writer) SetFlowLimiter(lim fc.FlowLimiter) { // for this client. func (c Writer) GetFlowLimiter() fc.FlowLimiter { return capnp.Client(c).GetFlowLimiter() -} // A Writer_Server is a Writer with a local implementation. +} + +// A Writer_Server is a Writer with a local implementation. type Writer_Server interface { Write(context.Context, Writer_write) error } @@ -236,9 +241,9 @@ func NewWriter_write_Params_List(s *capnp.Segment, sz int32) (Writer_write_Param // Writer_write_Params_Future is a wrapper for a Writer_write_Params promised by a client call. type Writer_write_Params_Future struct{ *capnp.Future } -func (p Writer_write_Params_Future) Struct() (Writer_write_Params, error) { - s, err := p.Future.Struct() - return Writer_write_Params(s), err +func (f Writer_write_Params_Future) Struct() (Writer_write_Params, error) { + p, err := f.Future.Ptr() + return Writer_write_Params(p.Struct()), err } type Writer_write_Results capnp.Struct @@ -301,9 +306,9 @@ func NewWriter_write_Results_List(s *capnp.Segment, sz int32) (Writer_write_Resu // Writer_write_Results_Future is a wrapper for a Writer_write_Results promised by a client call. type Writer_write_Results_Future struct{ *capnp.Future } -func (p Writer_write_Results_Future) Struct() (Writer_write_Results, error) { - s, err := p.Future.Struct() - return Writer_write_Results(s), err +func (f Writer_write_Results_Future) Struct() (Writer_write_Results, error) { + p, err := f.Future.Ptr() + return Writer_write_Results(p.Struct()), err } const schema_aca73f831c7ebfdd = "x\xda\x12\xa8u`1\xe4\xcdgb`\x0a\x94ae" + diff --git a/internal/aircraftlib/aircraft.capnp.go b/internal/aircraftlib/aircraft.capnp.go index 9b2baa09..5ef218c7 100644 --- a/internal/aircraftlib/aircraft.capnp.go +++ b/internal/aircraftlib/aircraft.capnp.go @@ -5054,6 +5054,7 @@ type Echo capnp.Client const Echo_TypeID = 0x8e5322c1e9282534 func (c Echo) Echo(ctx context.Context, params func(Echo_echo_Params) error) (Echo_echo_Results_Future, capnp.ReleaseFunc) { + s := capnp.Send{ Method: capnp.Method{ InterfaceID: 0x8e5322c1e9282534, @@ -5066,8 +5067,10 @@ func (c Echo) Echo(ctx context.Context, params func(Echo_echo_Params) error) (Ec s.ArgsSize = capnp.ObjectSize{DataSize: 0, PointerCount: 1} s.PlaceArgs = func(s capnp.Struct) error { return params(Echo_echo_Params(s)) } } + ans, release := capnp.Client(c).SendCall(ctx, s) return Echo_echo_Results_Future{Future: ans.Future()}, release + } // String returns a string that identifies this capability for debugging @@ -5135,7 +5138,9 @@ func (c Echo) SetFlowLimiter(lim fc.FlowLimiter) { // for this client. func (c Echo) GetFlowLimiter() fc.FlowLimiter { return capnp.Client(c).GetFlowLimiter() -} // A Echo_Server is a Echo with a local implementation. +} + +// A Echo_Server is a Echo with a local implementation. type Echo_Server interface { Echo(context.Context, Echo_echo) error } @@ -5839,6 +5844,7 @@ type CallSequence capnp.Client const CallSequence_TypeID = 0xabaedf5f7817c820 func (c CallSequence) GetNumber(ctx context.Context, params func(CallSequence_getNumber_Params) error) (CallSequence_getNumber_Results_Future, capnp.ReleaseFunc) { + s := capnp.Send{ Method: capnp.Method{ InterfaceID: 0xabaedf5f7817c820, @@ -5851,8 +5857,10 @@ func (c CallSequence) GetNumber(ctx context.Context, params func(CallSequence_ge s.ArgsSize = capnp.ObjectSize{DataSize: 0, PointerCount: 0} s.PlaceArgs = func(s capnp.Struct) error { return params(CallSequence_getNumber_Params(s)) } } + ans, release := capnp.Client(c).SendCall(ctx, s) return CallSequence_getNumber_Results_Future{Future: ans.Future()}, release + } // String returns a string that identifies this capability for debugging @@ -5920,7 +5928,9 @@ func (c CallSequence) SetFlowLimiter(lim fc.FlowLimiter) { // for this client. func (c CallSequence) GetFlowLimiter() fc.FlowLimiter { return capnp.Client(c).GetFlowLimiter() -} // A CallSequence_Server is a CallSequence with a local implementation. +} + +// A CallSequence_Server is a CallSequence with a local implementation. type CallSequence_Server interface { GetNumber(context.Context, CallSequence_getNumber) error } @@ -6128,6 +6138,7 @@ type Pipeliner capnp.Client const Pipeliner_TypeID = 0xd6514008f0f84ebc func (c Pipeliner) NewPipeliner(ctx context.Context, params func(Pipeliner_newPipeliner_Params) error) (Pipeliner_newPipeliner_Results_Future, capnp.ReleaseFunc) { + s := capnp.Send{ Method: capnp.Method{ InterfaceID: 0xd6514008f0f84ebc, @@ -6140,10 +6151,14 @@ func (c Pipeliner) NewPipeliner(ctx context.Context, params func(Pipeliner_newPi s.ArgsSize = capnp.ObjectSize{DataSize: 0, PointerCount: 0} s.PlaceArgs = func(s capnp.Struct) error { return params(Pipeliner_newPipeliner_Params(s)) } } + ans, release := capnp.Client(c).SendCall(ctx, s) return Pipeliner_newPipeliner_Results_Future{Future: ans.Future()}, release + } + func (c Pipeliner) GetNumber(ctx context.Context, params func(CallSequence_getNumber_Params) error) (CallSequence_getNumber_Results_Future, capnp.ReleaseFunc) { + s := capnp.Send{ Method: capnp.Method{ InterfaceID: 0xabaedf5f7817c820, @@ -6156,8 +6171,10 @@ func (c Pipeliner) GetNumber(ctx context.Context, params func(CallSequence_getNu s.ArgsSize = capnp.ObjectSize{DataSize: 0, PointerCount: 0} s.PlaceArgs = func(s capnp.Struct) error { return params(CallSequence_getNumber_Params(s)) } } + ans, release := capnp.Client(c).SendCall(ctx, s) return CallSequence_getNumber_Results_Future{Future: ans.Future()}, release + } // String returns a string that identifies this capability for debugging @@ -6225,7 +6242,9 @@ func (c Pipeliner) SetFlowLimiter(lim fc.FlowLimiter) { // for this client. func (c Pipeliner) GetFlowLimiter() fc.FlowLimiter { return capnp.Client(c).GetFlowLimiter() -} // A Pipeliner_Server is a Pipeliner with a local implementation. +} + +// A Pipeliner_Server is a Pipeliner with a local implementation. type Pipeliner_Server interface { NewPipeliner(context.Context, Pipeliner_newPipeliner) error diff --git a/rpc/bench_test.go b/rpc/bench_test.go index 9eceb1e5..5df0ee0f 100644 --- a/rpc/bench_test.go +++ b/rpc/bench_test.go @@ -10,7 +10,6 @@ import ( "capnproto.org/go/capnp/v3/flowcontrol" "capnproto.org/go/capnp/v3/rpc" testcp "capnproto.org/go/capnp/v3/rpc/internal/testcapnp" - "capnproto.org/go/capnp/v3/std/capnp/stream" ) type benchmarkStreamingConfig struct { @@ -47,30 +46,24 @@ func benchmarkStreaming(b *testing.B, cfg *benchmarkStreamingConfig) { defer conn2.Close() bootstrap := testcp.StreamTest(conn2.Bootstrap(ctx)) defer bootstrap.Release() - var ( - futures []stream.StreamResult_Future - releaseFuncs []capnp.ReleaseFunc - ) bootstrap.SetFlowLimiter(flowcontrol.NewFixedLimiter(cfg.FlowLimit)) data := make([]byte, cfg.MessageSize) b.ResetTimer() for i := 0; i < b.N; i++ { for j := 0; j < cfg.MessageCount; j++ { - fut, rel := bootstrap.Push(ctx, func(p testcp.StreamTest_push_Params) error { + err := bootstrap.Push(ctx, func(p testcp.StreamTest_push_Params) error { return p.SetData(data) }) - futures = append(futures, fut) - releaseFuncs = append(releaseFuncs, rel) + if err != nil { + b.Fatalf("Streaming call #%v failed: %v", j, err) + } } } - for i, fut := range futures { - _, err := fut.Struct() - if err != nil { - b.Errorf("Error waiting on future #%v: %v", i, err) - } - } - for _, rel := range releaseFuncs { - rel() + fut, rel := bootstrap.Done(ctx, nil) + defer rel() + _, err := fut.Struct() + if err != nil { + b.Errorf("Error waiting on done() future: %v", err) } } @@ -82,6 +75,10 @@ func (nullStream) Push(context.Context, testcp.StreamTest_push) error { return nil } +func (nullStream) Done(context.Context, testcp.StreamTest_done) error { + return nil +} + func BenchmarkPingPong(b *testing.B) { p1, p2 := net.Pipe() srv := testcp.PingPong_ServerToClient(pingPongServer{}) diff --git a/rpc/flow_test.go b/rpc/flow_test.go index dd4f3973..fabd7c8d 100644 --- a/rpc/flow_test.go +++ b/rpc/flow_test.go @@ -144,3 +144,7 @@ func (slowStreamTestServer) Push(ctx context.Context, p testcapnp.StreamTest_pus time.Sleep(200 * time.Millisecond) return nil } + +func (slowStreamTestServer) Done(context.Context, testcapnp.StreamTest_done) error { + return nil +} diff --git a/rpc/internal/testcapnp/test.capnp b/rpc/internal/testcapnp/test.capnp index 39c69ec6..1fc9e921 100644 --- a/rpc/internal/testcapnp/test.capnp +++ b/rpc/internal/testcapnp/test.capnp @@ -12,6 +12,7 @@ interface PingPong { interface StreamTest { push @0 (data :Data) -> stream; + done @1 (); } interface CapArgsTest { diff --git a/rpc/internal/testcapnp/test.capnp.go b/rpc/internal/testcapnp/test.capnp.go index 03dc42f3..3144adf9 100644 --- a/rpc/internal/testcapnp/test.capnp.go +++ b/rpc/internal/testcapnp/test.capnp.go @@ -19,6 +19,7 @@ type PingPong capnp.Client const PingPong_TypeID = 0xf004c474c2f8ee7a func (c PingPong) EchoNum(ctx context.Context, params func(PingPong_echoNum_Params) error) (PingPong_echoNum_Results_Future, capnp.ReleaseFunc) { + s := capnp.Send{ Method: capnp.Method{ InterfaceID: 0xf004c474c2f8ee7a, @@ -31,8 +32,10 @@ func (c PingPong) EchoNum(ctx context.Context, params func(PingPong_echoNum_Para s.ArgsSize = capnp.ObjectSize{DataSize: 8, PointerCount: 0} s.PlaceArgs = func(s capnp.Struct) error { return params(PingPong_echoNum_Params(s)) } } + ans, release := capnp.Client(c).SendCall(ctx, s) return PingPong_echoNum_Results_Future{Future: ans.Future()}, release + } // String returns a string that identifies this capability for debugging @@ -100,7 +103,9 @@ func (c PingPong) SetFlowLimiter(lim fc.FlowLimiter) { // for this client. func (c PingPong) GetFlowLimiter() fc.FlowLimiter { return capnp.Client(c).GetFlowLimiter() -} // A PingPong_Server is a PingPong with a local implementation. +} + +// A PingPong_Server is a PingPong with a local implementation. type PingPong_Server interface { EchoNum(context.Context, PingPong_echoNum) error } @@ -314,7 +319,7 @@ type StreamTest capnp.Client // StreamTest_TypeID is the unique identifier for the type StreamTest. const StreamTest_TypeID = 0xbb3ca85b01eea465 -func (c StreamTest) Push(ctx context.Context, params func(StreamTest_push_Params) error) (stream.StreamResult_Future, capnp.ReleaseFunc) { +func (c StreamTest) Push(ctx context.Context, params func(StreamTest_push_Params) error) error { s := capnp.Send{ Method: capnp.Method{ InterfaceID: 0xbb3ca85b01eea465, @@ -327,8 +332,29 @@ func (c StreamTest) Push(ctx context.Context, params func(StreamTest_push_Params s.ArgsSize = capnp.ObjectSize{DataSize: 0, PointerCount: 1} s.PlaceArgs = func(s capnp.Struct) error { return params(StreamTest_push_Params(s)) } } + + return capnp.Client(c).SendStreamCall(ctx, s) + +} + +func (c StreamTest) Done(ctx context.Context, params func(StreamTest_done_Params) error) (StreamTest_done_Results_Future, capnp.ReleaseFunc) { + + s := capnp.Send{ + Method: capnp.Method{ + InterfaceID: 0xbb3ca85b01eea465, + MethodID: 1, + InterfaceName: "test.capnp:StreamTest", + MethodName: "done", + }, + } + if params != nil { + s.ArgsSize = capnp.ObjectSize{DataSize: 0, PointerCount: 0} + s.PlaceArgs = func(s capnp.Struct) error { return params(StreamTest_done_Params(s)) } + } + ans, release := capnp.Client(c).SendCall(ctx, s) - return stream.StreamResult_Future{Future: ans.Future()}, release + return StreamTest_done_Results_Future{Future: ans.Future()}, release + } // String returns a string that identifies this capability for debugging @@ -396,9 +422,13 @@ func (c StreamTest) SetFlowLimiter(lim fc.FlowLimiter) { // for this client. func (c StreamTest) GetFlowLimiter() fc.FlowLimiter { return capnp.Client(c).GetFlowLimiter() -} // A StreamTest_Server is a StreamTest with a local implementation. +} + +// A StreamTest_Server is a StreamTest with a local implementation. type StreamTest_Server interface { Push(context.Context, StreamTest_push) error + + Done(context.Context, StreamTest_done) error } // StreamTest_NewServer creates a new Server from an implementation of StreamTest_Server. @@ -417,7 +447,7 @@ func StreamTest_ServerToClient(s StreamTest_Server) StreamTest { // This can be used to create a more complicated Server. func StreamTest_Methods(methods []server.Method, s StreamTest_Server) []server.Method { if cap(methods) == 0 { - methods = make([]server.Method, 0, 1) + methods = make([]server.Method, 0, 2) } methods = append(methods, server.Method{ @@ -432,6 +462,18 @@ func StreamTest_Methods(methods []server.Method, s StreamTest_Server) []server.M }, }) + methods = append(methods, server.Method{ + Method: capnp.Method{ + InterfaceID: 0xbb3ca85b01eea465, + MethodID: 1, + InterfaceName: "test.capnp:StreamTest", + MethodName: "done", + }, + Impl: func(ctx context.Context, call *server.Call) error { + return s.Done(ctx, StreamTest_done{call}) + }, + }) + return methods } @@ -452,6 +494,23 @@ func (c StreamTest_push) AllocResults() (stream.StreamResult, error) { return stream.StreamResult(r), err } +// StreamTest_done holds the state for a server call to StreamTest.done. +// See server.Call for documentation. +type StreamTest_done struct { + *server.Call +} + +// Args returns the call's arguments. +func (c StreamTest_done) Args() StreamTest_done_Params { + return StreamTest_done_Params(c.Call.Args()) +} + +// AllocResults allocates the results struct. +func (c StreamTest_done) AllocResults() (StreamTest_done_Results, error) { + r, err := c.Call.AllocResults(capnp.ObjectSize{DataSize: 0, PointerCount: 0}) + return StreamTest_done_Results(r), err +} + // StreamTest_List is a list of StreamTest. type StreamTest_List = capnp.CapList[StreamTest] @@ -538,12 +597,143 @@ func (f StreamTest_push_Params_Future) Struct() (StreamTest_push_Params, error) return StreamTest_push_Params(p.Struct()), err } +type StreamTest_done_Params capnp.Struct + +// StreamTest_done_Params_TypeID is the unique identifier for the type StreamTest_done_Params. +const StreamTest_done_Params_TypeID = 0x86370ae31868a1e2 + +func NewStreamTest_done_Params(s *capnp.Segment) (StreamTest_done_Params, error) { + st, err := capnp.NewStruct(s, capnp.ObjectSize{DataSize: 0, PointerCount: 0}) + return StreamTest_done_Params(st), err +} + +func NewRootStreamTest_done_Params(s *capnp.Segment) (StreamTest_done_Params, error) { + st, err := capnp.NewRootStruct(s, capnp.ObjectSize{DataSize: 0, PointerCount: 0}) + return StreamTest_done_Params(st), err +} + +func ReadRootStreamTest_done_Params(msg *capnp.Message) (StreamTest_done_Params, error) { + root, err := msg.Root() + return StreamTest_done_Params(root.Struct()), err +} + +func (s StreamTest_done_Params) String() string { + str, _ := text.Marshal(0x86370ae31868a1e2, capnp.Struct(s)) + return str +} + +func (s StreamTest_done_Params) EncodeAsPtr(seg *capnp.Segment) capnp.Ptr { + return capnp.Struct(s).EncodeAsPtr(seg) +} + +func (StreamTest_done_Params) DecodeFromPtr(p capnp.Ptr) StreamTest_done_Params { + return StreamTest_done_Params(capnp.Struct{}.DecodeFromPtr(p)) +} + +func (s StreamTest_done_Params) ToPtr() capnp.Ptr { + return capnp.Struct(s).ToPtr() +} +func (s StreamTest_done_Params) IsValid() bool { + return capnp.Struct(s).IsValid() +} + +func (s StreamTest_done_Params) Message() *capnp.Message { + return capnp.Struct(s).Message() +} + +func (s StreamTest_done_Params) Segment() *capnp.Segment { + return capnp.Struct(s).Segment() +} + +// StreamTest_done_Params_List is a list of StreamTest_done_Params. +type StreamTest_done_Params_List = capnp.StructList[StreamTest_done_Params] + +// NewStreamTest_done_Params creates a new list of StreamTest_done_Params. +func NewStreamTest_done_Params_List(s *capnp.Segment, sz int32) (StreamTest_done_Params_List, error) { + l, err := capnp.NewCompositeList(s, capnp.ObjectSize{DataSize: 0, PointerCount: 0}, sz) + return capnp.StructList[StreamTest_done_Params](l), err +} + +// StreamTest_done_Params_Future is a wrapper for a StreamTest_done_Params promised by a client call. +type StreamTest_done_Params_Future struct{ *capnp.Future } + +func (f StreamTest_done_Params_Future) Struct() (StreamTest_done_Params, error) { + p, err := f.Future.Ptr() + return StreamTest_done_Params(p.Struct()), err +} + +type StreamTest_done_Results capnp.Struct + +// StreamTest_done_Results_TypeID is the unique identifier for the type StreamTest_done_Results. +const StreamTest_done_Results_TypeID = 0xebb3a7aa1f38c1b9 + +func NewStreamTest_done_Results(s *capnp.Segment) (StreamTest_done_Results, error) { + st, err := capnp.NewStruct(s, capnp.ObjectSize{DataSize: 0, PointerCount: 0}) + return StreamTest_done_Results(st), err +} + +func NewRootStreamTest_done_Results(s *capnp.Segment) (StreamTest_done_Results, error) { + st, err := capnp.NewRootStruct(s, capnp.ObjectSize{DataSize: 0, PointerCount: 0}) + return StreamTest_done_Results(st), err +} + +func ReadRootStreamTest_done_Results(msg *capnp.Message) (StreamTest_done_Results, error) { + root, err := msg.Root() + return StreamTest_done_Results(root.Struct()), err +} + +func (s StreamTest_done_Results) String() string { + str, _ := text.Marshal(0xebb3a7aa1f38c1b9, capnp.Struct(s)) + return str +} + +func (s StreamTest_done_Results) EncodeAsPtr(seg *capnp.Segment) capnp.Ptr { + return capnp.Struct(s).EncodeAsPtr(seg) +} + +func (StreamTest_done_Results) DecodeFromPtr(p capnp.Ptr) StreamTest_done_Results { + return StreamTest_done_Results(capnp.Struct{}.DecodeFromPtr(p)) +} + +func (s StreamTest_done_Results) ToPtr() capnp.Ptr { + return capnp.Struct(s).ToPtr() +} +func (s StreamTest_done_Results) IsValid() bool { + return capnp.Struct(s).IsValid() +} + +func (s StreamTest_done_Results) Message() *capnp.Message { + return capnp.Struct(s).Message() +} + +func (s StreamTest_done_Results) Segment() *capnp.Segment { + return capnp.Struct(s).Segment() +} + +// StreamTest_done_Results_List is a list of StreamTest_done_Results. +type StreamTest_done_Results_List = capnp.StructList[StreamTest_done_Results] + +// NewStreamTest_done_Results creates a new list of StreamTest_done_Results. +func NewStreamTest_done_Results_List(s *capnp.Segment, sz int32) (StreamTest_done_Results_List, error) { + l, err := capnp.NewCompositeList(s, capnp.ObjectSize{DataSize: 0, PointerCount: 0}, sz) + return capnp.StructList[StreamTest_done_Results](l), err +} + +// StreamTest_done_Results_Future is a wrapper for a StreamTest_done_Results promised by a client call. +type StreamTest_done_Results_Future struct{ *capnp.Future } + +func (f StreamTest_done_Results_Future) Struct() (StreamTest_done_Results, error) { + p, err := f.Future.Ptr() + return StreamTest_done_Results(p.Struct()), err +} + type CapArgsTest capnp.Client // CapArgsTest_TypeID is the unique identifier for the type CapArgsTest. const CapArgsTest_TypeID = 0xb86bce7f916a10cc func (c CapArgsTest) Call(ctx context.Context, params func(CapArgsTest_call_Params) error) (CapArgsTest_call_Results_Future, capnp.ReleaseFunc) { + s := capnp.Send{ Method: capnp.Method{ InterfaceID: 0xb86bce7f916a10cc, @@ -556,10 +746,14 @@ func (c CapArgsTest) Call(ctx context.Context, params func(CapArgsTest_call_Para s.ArgsSize = capnp.ObjectSize{DataSize: 0, PointerCount: 1} s.PlaceArgs = func(s capnp.Struct) error { return params(CapArgsTest_call_Params(s)) } } + ans, release := capnp.Client(c).SendCall(ctx, s) return CapArgsTest_call_Results_Future{Future: ans.Future()}, release + } + func (c CapArgsTest) Self(ctx context.Context, params func(CapArgsTest_self_Params) error) (CapArgsTest_self_Results_Future, capnp.ReleaseFunc) { + s := capnp.Send{ Method: capnp.Method{ InterfaceID: 0xb86bce7f916a10cc, @@ -572,8 +766,10 @@ func (c CapArgsTest) Self(ctx context.Context, params func(CapArgsTest_self_Para s.ArgsSize = capnp.ObjectSize{DataSize: 0, PointerCount: 0} s.PlaceArgs = func(s capnp.Struct) error { return params(CapArgsTest_self_Params(s)) } } + ans, release := capnp.Client(c).SendCall(ctx, s) return CapArgsTest_self_Results_Future{Future: ans.Future()}, release + } // String returns a string that identifies this capability for debugging @@ -641,7 +837,9 @@ func (c CapArgsTest) SetFlowLimiter(lim fc.FlowLimiter) { // for this client. func (c CapArgsTest) GetFlowLimiter() fc.FlowLimiter { return capnp.Client(c).GetFlowLimiter() -} // A CapArgsTest_Server is a CapArgsTest with a local implementation. +} + +// A CapArgsTest_Server is a CapArgsTest with a local implementation. type CapArgsTest_Server interface { Call(context.Context, CapArgsTest_call) error @@ -1043,6 +1241,7 @@ type PingPongProvider capnp.Client const PingPongProvider_TypeID = 0x95b6142577e93239 func (c PingPongProvider) PingPong(ctx context.Context, params func(PingPongProvider_pingPong_Params) error) (PingPongProvider_pingPong_Results_Future, capnp.ReleaseFunc) { + s := capnp.Send{ Method: capnp.Method{ InterfaceID: 0x95b6142577e93239, @@ -1055,8 +1254,10 @@ func (c PingPongProvider) PingPong(ctx context.Context, params func(PingPongProv s.ArgsSize = capnp.ObjectSize{DataSize: 0, PointerCount: 0} s.PlaceArgs = func(s capnp.Struct) error { return params(PingPongProvider_pingPong_Params(s)) } } + ans, release := capnp.Client(c).SendCall(ctx, s) return PingPongProvider_pingPong_Results_Future{Future: ans.Future()}, release + } // String returns a string that identifies this capability for debugging @@ -1124,7 +1325,9 @@ func (c PingPongProvider) SetFlowLimiter(lim fc.FlowLimiter) { // for this client. func (c PingPongProvider) GetFlowLimiter() fc.FlowLimiter { return capnp.Client(c).GetFlowLimiter() -} // A PingPongProvider_Server is a PingPongProvider with a local implementation. +} + +// A PingPongProvider_Server is a PingPongProvider with a local implementation. type PingPongProvider_Server interface { PingPong(context.Context, PingPongProvider_pingPong) error } @@ -1339,55 +1542,61 @@ func (p PingPongProvider_pingPong_Results_Future) PingPong() PingPong { return PingPong(p.Future.Field(0, nil).Client()) } -const schema_ef12a34b9807e19c = "x\xda\x94TMH\x14o\x18\x7f\x9ey\xdf\xf9\x8f\xf2" + - "o\xd9\xde\x1d\xe9\x9b>D;H,\xa9\x04e\xd9j" + - "a\x0bF\xb2cz\xa8\x0e1\xe9\xa8k;\xeb2\xb3" + - "\x9b\x14\x94]\xc4S\x07\x0fiFt\xb0\x88:t(" + - "($/%\x05E\x12\x1d\x8a\x08\x93\xd2S\x11\x98\xd5" + - "E\x88\x9ax\xdf\xd9qg\xfd\x82\xae\xfb<\xf3\xfbz" + - "~\xef\xee|\x815\xb4<\x10Q@\xd2\x9a\xe4\xff\x9c" + - "\xe1\x8e\xbex\xc3\x85\x82\x8b\xc0\xd6\"\x80\x8c\x0a@e" + - "/\xd9\x80\x80\xea%\x12\x01\xfc\xb3\xbdtb\xe0\xf7d" + - "\xafV\x84\x08@\xf9\xf8.)\xe6\xe3\x07|\xec\xec\xa9" + - "\xf8\xd2]Z\xf4\xf02\xb0\xff\x89smJ\xb9r\xf8" + - "F\xe8\x1b\x00\xaao\xc9\xb4:E\x14\x00u\x92D\xd5" + - "B\xfe\xa5\x13\xd8:;\x16x\xf6k\xc0%\x13`\xdf" + - "9\x18u*\x98\xf3R\x1e?4\xe8\x97\xf1\xde\xe5\x99" + - "\x12<\xe3\xab;\xfb{^\x9d\x1eY\xc4\x83\xf4\x91\x8b" + - "\xae\xca\xb4Om\x16<\xc6\xcd\x19\x98\x98\xd5u&\xae\xb4\x1aV\x0cQ\xa3" + - "D\xf6e\x8f^\x04\x8c\xd5\x83\xc4\x0a\x15'\x95\xfd\x08" + - "\x00j0\x86\xb8\xb2EO\xd8\xb2[\xb6\x91h[R" + - "~YN~\x90/!\xcbU\x0d\x10\x99\xcf\x0bz\x98" + - "\x11\x17\x94\xdb(\x106\xbcG\x84^\xc1Yy\x19H" + - "\xacT\xc1\\G\xd0\xab8[\xcfg\x01%\xc8\x95\xd7" + - "\xb8\xac\xf9\x169\xd1\xd1\xb4e\xe8[L\x8f\xc7\x8d\xcb" + - "\xeb\x01&\xef=\xee\xae\xbczr\x881\x8e%+\xc1" + - "T\xc6\xee\xc8\x07\xa1\x0b\x93o5\xac\xb0\x97*\xef\x84" + - "\xa2\x9b\xf6\xca\xe7^\xa28+_{\xc9\xd4\xb3(y" + - "\xee\x04\x95\xc2\x95\xcd{\xf3\x1e\x1bB\xf6\x1f\x87\xb1\x03" + - "\xc2[OV\xce\xbf\xd8k4\xec`f\xc1\xa5\xeb\x01" + - "\xb4U\x04\xb5u\x12\xfa\xdb\x85,\xf7X\x17\x1c\x9cx" + - "w\x10g\x08\xf3\x8c\x85\x1b\x13\x97\xadP\xab\x9e\xd61" + - "\x00\x12\x06\x00\xff\x06\x00\x00\xff\xff\xfe*\x86\xac" +const schema_ef12a34b9807e19c = "x\xda\x94\x94]H\x1cW\x14\xc7\xcf\x99\xb9\xd3\xd1\xb6" + + "\xc3\xf6\xee\xd8V\xfbe\x15\xed\x83\xb4R\x95RkK" + + "wmi\xa5\x16d\xc7\xb6\x0f\xfd\x802uGw\xed" + + "~\xb1\xb3[\xa1`m\x1e\x8c\xcfyP\x93\x90\x044" + + "1_\x90\x07\x13\x124\x12\x88\x92@BLH \x90" + + "\x07#\x89\x86@DP\x89/BH&\xdc\x99\xbd\xee" + + "\xec\xae\x1a\xf2\xb6\xec9s\xfe\xbfs\xce\xff\xdcO\xd7" + + "\xd1O\xea\x94U\x19\x04\xed\x0f\xe9\x15k44\x10n" + + "\xfb\xaf\xe8\x7f\xa0o#\x80\x842@\xc3\xbc\xf8\x0e\x02" + + "\xaa\x0fE\x1f\xe0\xb3\x8f\xaa\xe7\x86\x9e\xce\xf7k%\x88" + + "\x00\x84\x85%R\xc9\xc2\x0a\xf1\x01Z\x8b#\xa1\xd2\x07" + + "\xaf~\xbe\x1b\xe8[<\xfe\x09\xf1\"\x10\xeb\x8b\xfa\xa5" + + "\x9e\xea\x92s\x83@_\x13\xad\x03\x0b\xf2\xde\x1f\x0f{" + + "W\x01P}\x93,\xaa\x15,S}\x9f\xb4\xa8?\xb0" + + "_\x96\xf2\xe1\xda\x8cr\xf9\xc9\x90\x83a\x97\xa9c2" + + "\xc4\xaa\xa7\xd65i\xf6\xfba7`\x99CPa\x13" + + "\xcc\xbe\xd1\xbd\xa7\xef\xc6\xdf\x13\x05:\xcd\xe4\xbcS]" + + "\xfd\x8e\x0c\xa8\x83\xb6\x8eqd\x05\x7f?\xfe\xd5TA" + + "r/9\xad\xf6\xdb\xc9\xbb\xc8\x80z\xcbN\xfex\xfd" + + "\x83\xbe\xe9\xcf\x1e\xdd\x06\xfa\x1e\x87\x9a$\x7f1\xa8\x8a" + + "\xe2{\x87N\xdc\x1f\xbe\x03\xae\xb1\x8c\x11{j\xa7l" + + "\xa8\xde\xa9\xa6\xce\xdf\xbe\xfee\xd1\xd5\xcfu\x16'\xd6" + + "\xe4tc\xf9\xc9cg\x96]\x03;\xebD\xfe]\xd9" + + "\x98I]\"k\x05l\x07\xc9\xa8:f\xb3\x8d\x90\x16" + + "\xf5\xaa\xcd\xb6|s\"\xfaeK\xf8\xb1\xc3\xe6\x8ce" + + "\x9ct3\x82\x0b6\xc1\xdc\xbb\xc9+G\xef6n8" + + ":N\xc2\x02\xdb\x0c\xaaK\xc4\x07\xbfZ)\xc3L\xd5" + + "v\xe8\x091\x96h\xfaVO4'\xbb\xcc\x9f\x9d\xbf" + + "\"\x91\xaa\x80\x9e\xd4\xc5\xa8\xa9\x11\x91\x00\x10\x04\xa0J" + + "%\x80V$\xa2V\"\xa0\xdc\xa1'\xd0KD@\xf4" + + "\x02\xe6T\x0a\x84c]\x81x\xac\xab\xd6\xe8\x08\xc5\xdb" + + "\xd2\xd1\xaav\xc3L\xcb\x91TN)o\xb6\x14\xc6P" + + "\x02\x01\xa5\xbc2?\xa5\x92\x86\x1e\xb5y\x82\xf1\x98a" + + "\xf3D\xd1\xdc\xcc\x11\\R\x81d\xfc\x9f\xb0\x1c4\x92" + + "\x01D\x8d\x88\x92ks\xc8\xc7Di+\x08\xb4X\xb6" + + "\x12\x99\x8f\x00\xc0\x8f\x01\xc4\x9d\xc7\xc0\xe1\xb7\xcd2\x8d" + + "H\xe7\x96-\xd6d[\xf4\xb0$\xa4Y\xa3\x02\"u" + + "\xf5\x8b\xbc\xa6\xcf)\xca\xda(\xb2\xdb\xe0\xc7\x89\xfc<" + + "h]\x0d\x08\xb4Z\xc6\xac\xc3\x90\x1f\x08-c1E" + + "\xf60r\xbf\xa3\x9a\xdb\"\xf2\xc1\x96Gsu\xb8W" + + "06~\xb1\xa7a\xff\x9f\xfb\\:\xfc\xc0\x91\x1bw" + + "S'\x916C~\xf4\xb0\xfd\xe4\xea\x90\xfc\xe5\x04\x8d" + + "d-\x1f<[\xa5\xacG\xcd\x9d]\xb3\x85\xff^l" + + "\x9a\x82\xc5d\xaa\xec\xe8,\xb6\xbc\x88\xe8Z1r\x1e" + + "\x99\xe1oZ\x8a\x9f\x0f" + + "\x00\x00\xff\xff\x1f\xcb\xbd\x93" func init() { schemas.Register(schema_ef12a34b9807e19c, 0x80087e4e698768a2, 0x85ddfd96db252600, + 0x86370ae31868a1e2, 0x95b6142577e93239, 0x96fbc50dc2f0200d, 0x9746cc05cbff1132, @@ -1396,6 +1605,7 @@ func init() { 0xd4e835c17f1ef32c, 0xd797e0a99edf0921, 0xe2553e5a663abb7d, + 0xebb3a7aa1f38c1b9, 0xf004c474c2f8ee7a, 0xf269473b6db8d0eb, 0xf838dca6c8721bdb) diff --git a/std/capnp/persistent/persistent.capnp.go b/std/capnp/persistent/persistent.capnp.go index 4ad24ed4..0ba886a8 100644 --- a/std/capnp/persistent/persistent.capnp.go +++ b/std/capnp/persistent/persistent.capnp.go @@ -20,6 +20,7 @@ type Persistent capnp.Client const Persistent_TypeID = 0xc8cb212fcd9f5691 func (c Persistent) Save(ctx context.Context, params func(Persistent_SaveParams) error) (Persistent_SaveResults_Future, capnp.ReleaseFunc) { + s := capnp.Send{ Method: capnp.Method{ InterfaceID: 0xc8cb212fcd9f5691, @@ -32,8 +33,10 @@ func (c Persistent) Save(ctx context.Context, params func(Persistent_SaveParams) s.ArgsSize = capnp.ObjectSize{DataSize: 0, PointerCount: 1} s.PlaceArgs = func(s capnp.Struct) error { return params(Persistent_SaveParams(s)) } } + ans, release := capnp.Client(c).SendCall(ctx, s) return Persistent_SaveResults_Future{Future: ans.Future()}, release + } // String returns a string that identifies this capability for debugging @@ -101,7 +104,9 @@ func (c Persistent) SetFlowLimiter(lim fc.FlowLimiter) { // for this client. func (c Persistent) GetFlowLimiter() fc.FlowLimiter { return capnp.Client(c).GetFlowLimiter() -} // A Persistent_Server is a Persistent with a local implementation. +} + +// A Persistent_Server is a Persistent with a local implementation. type Persistent_Server interface { Save(context.Context, Persistent_save) error } From 11fe96393cd3a40a49fdc6974fe38cbc3245f0c3 Mon Sep 17 00:00:00 2001 From: Ian Denhardt Date: Sun, 12 Feb 2023 18:58:21 -0500 Subject: [PATCH 05/11] Add failing test for streaming. --- rpc/streaming_test.go | 45 +++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 45 insertions(+) create mode 100644 rpc/streaming_test.go diff --git a/rpc/streaming_test.go b/rpc/streaming_test.go new file mode 100644 index 00000000..d9d4e395 --- /dev/null +++ b/rpc/streaming_test.go @@ -0,0 +1,45 @@ +package rpc_test + +import ( + "context" + "fmt" + "testing" + + "capnproto.org/go/capnp/v3/rpc/internal/testcapnp" + "github.com/stretchr/testify/assert" +) + +// TestStreamingDoneErr verifies that if an error occurs in a streaming call, +// it shows up in a subsequent call to done(). +func TestStreamingDoneErr(t *testing.T) { + ctx := context.Background() + client := testcapnp.StreamTest_ServerToClient(&maxPushStream{limit: 0}) + defer client.Release() + assert.NoError(t, client.Push(ctx, nil)) + fut, rel := client.Done(ctx, nil) + defer rel() + _, err := fut.Struct() + assert.NotNil(t, err) +} + +// A maxPushStream is an implementation of StreamTest that +// starts returning errors after a specified number of calls. +type maxPushStream struct { + count int // How many calls have we seen? + limit int // How many calls are permitted? +} + +func (m *maxPushStream) Push(context.Context, testcapnp.StreamTest_push) error { + m.count++ + if m.count > m.limit { + return fmt.Errorf("Exceeded limit of %v calls", m.limit) + } + return nil +} + +func (m *maxPushStream) Done(context.Context, testcapnp.StreamTest_done) error { + // Note: important to always return nil here, so the tests can distinguish + // between an error carried over from a call to push() and one returned + // directly from done(). + return nil +} From a104f31f904147a2ac19bffbc6f14cfc67409bc5 Mon Sep 17 00:00:00 2001 From: Ian Denhardt Date: Mon, 13 Feb 2023 18:08:15 -0500 Subject: [PATCH 06/11] Tweak the streaming API to use a WaitStreaming() method. --- capability.go | 37 +++++++++++++++++++++++++++++++++++-- rpc/streaming_test.go | 22 +++++++++++++++------- 2 files changed, 50 insertions(+), 9 deletions(-) diff --git a/capability.go b/capability.go index 8bf20e63..980342be 100644 --- a/capability.go +++ b/capability.go @@ -125,7 +125,8 @@ type client struct { mu sync.Mutex // protects the struct limiter flowcontrol.FlowLimiter - streamError error // Last error from streaming calls. + streamError error // Last error from streaming calls. + streamWg *sync.WaitGroup h *clientHook // nil if resolved to nil or released released bool } @@ -394,15 +395,26 @@ func (c Client) SendCall(ctx context.Context, s Send) (*Answer, ReleaseFunc) { // client will return the same error (without starting // the method or calling PlaceArgs). func (c Client) SendStreamCall(ctx context.Context, s Send) error { - var streamError error + var ( + streamError error + wg *sync.WaitGroup + ) syncutil.With(&c.mu, func() { streamError = c.streamError + if streamError == nil { + if c.streamWg == nil { + c.streamWg = &sync.WaitGroup{} + } + wg = c.streamWg + wg.Add(1) + } }) if streamError != nil { return streamError } ans, release := c.SendCall(ctx, s) go func() { + defer wg.Done() _, err := ans.Future().Ptr() release() if err != nil { @@ -414,6 +426,27 @@ func (c Client) SendStreamCall(ctx context.Context, s Send) error { return nil } +// WaitStreaming waits for all outstanding streaming calls (i.e. calls +// started with SendStreamCall) to complete, and then returns an error +// if any streaming call has failed. +func (c Client) WaitStreaming() error { + var ( + ret error + wg *sync.WaitGroup + ) + syncutil.With(&c.mu, func() { + wg = c.streamWg + }) + if wg == nil { + return nil + } + wg.Wait() + syncutil.With(&c.mu, func() { + ret = c.streamError + }) + return ret +} + // RecvCall starts executing a method with the referenced arguments // and returns an answer that will hold the result. The hook will call // a.Release when it no longer needs to reference the parameters. The diff --git a/rpc/streaming_test.go b/rpc/streaming_test.go index d9d4e395..4ffc7ea1 100644 --- a/rpc/streaming_test.go +++ b/rpc/streaming_test.go @@ -5,21 +5,29 @@ import ( "fmt" "testing" + "capnproto.org/go/capnp/v3" "capnproto.org/go/capnp/v3/rpc/internal/testcapnp" "github.com/stretchr/testify/assert" ) -// TestStreamingDoneErr verifies that if an error occurs in a streaming call, -// it shows up in a subsequent call to done(). -func TestStreamingDoneErr(t *testing.T) { +// TestStreamingWaitOk verifies that if no errors occur in streaming calls, +// WaitStreaming retnurs nil. +func TestStreamingWaitOk(t *testing.T) { + ctx := context.Background() + client := testcapnp.StreamTest_ServerToClient(&maxPushStream{limit: 1}) + defer client.Release() + assert.NoError(t, client.Push(ctx, nil)) + assert.NoError(t, capnp.Client(client).WaitStreaming()) +} + +// TestStreamingWaitErr verifies that if an error occurs in a streaming call, +// it shows up in a subsequent call to WaitStreaming(). +func TestStreamingWaitErr(t *testing.T) { ctx := context.Background() client := testcapnp.StreamTest_ServerToClient(&maxPushStream{limit: 0}) defer client.Release() assert.NoError(t, client.Push(ctx, nil)) - fut, rel := client.Done(ctx, nil) - defer rel() - _, err := fut.Struct() - assert.NotNil(t, err) + assert.NotNil(t, capnp.Client(client).WaitStreaming()) } // A maxPushStream is an implementation of StreamTest that From 55cc0aa5cef5cd098e48a34fe0aed650f6622e29 Mon Sep 17 00:00:00 2001 From: Ian Denhardt Date: Mon, 13 Feb 2023 18:11:27 -0500 Subject: [PATCH 07/11] testcapnp: Remove now unneeded done-method. WaitStreaming() obviates this. --- rpc/bench_test.go | 11 +- rpc/flow_test.go | 4 - rpc/internal/testcapnp/test.capnp | 1 - rpc/internal/testcapnp/test.capnp.go | 278 +++++---------------------- rpc/streaming_test.go | 7 - 5 files changed, 47 insertions(+), 254 deletions(-) diff --git a/rpc/bench_test.go b/rpc/bench_test.go index 5df0ee0f..ce664d01 100644 --- a/rpc/bench_test.go +++ b/rpc/bench_test.go @@ -59,11 +59,8 @@ func benchmarkStreaming(b *testing.B, cfg *benchmarkStreamingConfig) { } } } - fut, rel := bootstrap.Done(ctx, nil) - defer rel() - _, err := fut.Struct() - if err != nil { - b.Errorf("Error waiting on done() future: %v", err) + if err := capnp.Client(bootstrap).WaitStreaming(); err != nil { + b.Errorf("Error waiting on streaming calls: %v", err) } } @@ -75,10 +72,6 @@ func (nullStream) Push(context.Context, testcp.StreamTest_push) error { return nil } -func (nullStream) Done(context.Context, testcp.StreamTest_done) error { - return nil -} - func BenchmarkPingPong(b *testing.B) { p1, p2 := net.Pipe() srv := testcp.PingPong_ServerToClient(pingPongServer{}) diff --git a/rpc/flow_test.go b/rpc/flow_test.go index fabd7c8d..dd4f3973 100644 --- a/rpc/flow_test.go +++ b/rpc/flow_test.go @@ -144,7 +144,3 @@ func (slowStreamTestServer) Push(ctx context.Context, p testcapnp.StreamTest_pus time.Sleep(200 * time.Millisecond) return nil } - -func (slowStreamTestServer) Done(context.Context, testcapnp.StreamTest_done) error { - return nil -} diff --git a/rpc/internal/testcapnp/test.capnp b/rpc/internal/testcapnp/test.capnp index 1fc9e921..39c69ec6 100644 --- a/rpc/internal/testcapnp/test.capnp +++ b/rpc/internal/testcapnp/test.capnp @@ -12,7 +12,6 @@ interface PingPong { interface StreamTest { push @0 (data :Data) -> stream; - done @1 (); } interface CapArgsTest { diff --git a/rpc/internal/testcapnp/test.capnp.go b/rpc/internal/testcapnp/test.capnp.go index 3144adf9..1f8d3f48 100644 --- a/rpc/internal/testcapnp/test.capnp.go +++ b/rpc/internal/testcapnp/test.capnp.go @@ -337,26 +337,6 @@ func (c StreamTest) Push(ctx context.Context, params func(StreamTest_push_Params } -func (c StreamTest) Done(ctx context.Context, params func(StreamTest_done_Params) error) (StreamTest_done_Results_Future, capnp.ReleaseFunc) { - - s := capnp.Send{ - Method: capnp.Method{ - InterfaceID: 0xbb3ca85b01eea465, - MethodID: 1, - InterfaceName: "test.capnp:StreamTest", - MethodName: "done", - }, - } - if params != nil { - s.ArgsSize = capnp.ObjectSize{DataSize: 0, PointerCount: 0} - s.PlaceArgs = func(s capnp.Struct) error { return params(StreamTest_done_Params(s)) } - } - - ans, release := capnp.Client(c).SendCall(ctx, s) - return StreamTest_done_Results_Future{Future: ans.Future()}, release - -} - // String returns a string that identifies this capability for debugging // purposes. Its format should not be depended on: in particular, it // should not be used to compare clients. Use IsSame to compare clients @@ -427,8 +407,6 @@ func (c StreamTest) GetFlowLimiter() fc.FlowLimiter { // A StreamTest_Server is a StreamTest with a local implementation. type StreamTest_Server interface { Push(context.Context, StreamTest_push) error - - Done(context.Context, StreamTest_done) error } // StreamTest_NewServer creates a new Server from an implementation of StreamTest_Server. @@ -447,7 +425,7 @@ func StreamTest_ServerToClient(s StreamTest_Server) StreamTest { // This can be used to create a more complicated Server. func StreamTest_Methods(methods []server.Method, s StreamTest_Server) []server.Method { if cap(methods) == 0 { - methods = make([]server.Method, 0, 2) + methods = make([]server.Method, 0, 1) } methods = append(methods, server.Method{ @@ -462,18 +440,6 @@ func StreamTest_Methods(methods []server.Method, s StreamTest_Server) []server.M }, }) - methods = append(methods, server.Method{ - Method: capnp.Method{ - InterfaceID: 0xbb3ca85b01eea465, - MethodID: 1, - InterfaceName: "test.capnp:StreamTest", - MethodName: "done", - }, - Impl: func(ctx context.Context, call *server.Call) error { - return s.Done(ctx, StreamTest_done{call}) - }, - }) - return methods } @@ -494,23 +460,6 @@ func (c StreamTest_push) AllocResults() (stream.StreamResult, error) { return stream.StreamResult(r), err } -// StreamTest_done holds the state for a server call to StreamTest.done. -// See server.Call for documentation. -type StreamTest_done struct { - *server.Call -} - -// Args returns the call's arguments. -func (c StreamTest_done) Args() StreamTest_done_Params { - return StreamTest_done_Params(c.Call.Args()) -} - -// AllocResults allocates the results struct. -func (c StreamTest_done) AllocResults() (StreamTest_done_Results, error) { - r, err := c.Call.AllocResults(capnp.ObjectSize{DataSize: 0, PointerCount: 0}) - return StreamTest_done_Results(r), err -} - // StreamTest_List is a list of StreamTest. type StreamTest_List = capnp.CapList[StreamTest] @@ -597,136 +546,6 @@ func (f StreamTest_push_Params_Future) Struct() (StreamTest_push_Params, error) return StreamTest_push_Params(p.Struct()), err } -type StreamTest_done_Params capnp.Struct - -// StreamTest_done_Params_TypeID is the unique identifier for the type StreamTest_done_Params. -const StreamTest_done_Params_TypeID = 0x86370ae31868a1e2 - -func NewStreamTest_done_Params(s *capnp.Segment) (StreamTest_done_Params, error) { - st, err := capnp.NewStruct(s, capnp.ObjectSize{DataSize: 0, PointerCount: 0}) - return StreamTest_done_Params(st), err -} - -func NewRootStreamTest_done_Params(s *capnp.Segment) (StreamTest_done_Params, error) { - st, err := capnp.NewRootStruct(s, capnp.ObjectSize{DataSize: 0, PointerCount: 0}) - return StreamTest_done_Params(st), err -} - -func ReadRootStreamTest_done_Params(msg *capnp.Message) (StreamTest_done_Params, error) { - root, err := msg.Root() - return StreamTest_done_Params(root.Struct()), err -} - -func (s StreamTest_done_Params) String() string { - str, _ := text.Marshal(0x86370ae31868a1e2, capnp.Struct(s)) - return str -} - -func (s StreamTest_done_Params) EncodeAsPtr(seg *capnp.Segment) capnp.Ptr { - return capnp.Struct(s).EncodeAsPtr(seg) -} - -func (StreamTest_done_Params) DecodeFromPtr(p capnp.Ptr) StreamTest_done_Params { - return StreamTest_done_Params(capnp.Struct{}.DecodeFromPtr(p)) -} - -func (s StreamTest_done_Params) ToPtr() capnp.Ptr { - return capnp.Struct(s).ToPtr() -} -func (s StreamTest_done_Params) IsValid() bool { - return capnp.Struct(s).IsValid() -} - -func (s StreamTest_done_Params) Message() *capnp.Message { - return capnp.Struct(s).Message() -} - -func (s StreamTest_done_Params) Segment() *capnp.Segment { - return capnp.Struct(s).Segment() -} - -// StreamTest_done_Params_List is a list of StreamTest_done_Params. -type StreamTest_done_Params_List = capnp.StructList[StreamTest_done_Params] - -// NewStreamTest_done_Params creates a new list of StreamTest_done_Params. -func NewStreamTest_done_Params_List(s *capnp.Segment, sz int32) (StreamTest_done_Params_List, error) { - l, err := capnp.NewCompositeList(s, capnp.ObjectSize{DataSize: 0, PointerCount: 0}, sz) - return capnp.StructList[StreamTest_done_Params](l), err -} - -// StreamTest_done_Params_Future is a wrapper for a StreamTest_done_Params promised by a client call. -type StreamTest_done_Params_Future struct{ *capnp.Future } - -func (f StreamTest_done_Params_Future) Struct() (StreamTest_done_Params, error) { - p, err := f.Future.Ptr() - return StreamTest_done_Params(p.Struct()), err -} - -type StreamTest_done_Results capnp.Struct - -// StreamTest_done_Results_TypeID is the unique identifier for the type StreamTest_done_Results. -const StreamTest_done_Results_TypeID = 0xebb3a7aa1f38c1b9 - -func NewStreamTest_done_Results(s *capnp.Segment) (StreamTest_done_Results, error) { - st, err := capnp.NewStruct(s, capnp.ObjectSize{DataSize: 0, PointerCount: 0}) - return StreamTest_done_Results(st), err -} - -func NewRootStreamTest_done_Results(s *capnp.Segment) (StreamTest_done_Results, error) { - st, err := capnp.NewRootStruct(s, capnp.ObjectSize{DataSize: 0, PointerCount: 0}) - return StreamTest_done_Results(st), err -} - -func ReadRootStreamTest_done_Results(msg *capnp.Message) (StreamTest_done_Results, error) { - root, err := msg.Root() - return StreamTest_done_Results(root.Struct()), err -} - -func (s StreamTest_done_Results) String() string { - str, _ := text.Marshal(0xebb3a7aa1f38c1b9, capnp.Struct(s)) - return str -} - -func (s StreamTest_done_Results) EncodeAsPtr(seg *capnp.Segment) capnp.Ptr { - return capnp.Struct(s).EncodeAsPtr(seg) -} - -func (StreamTest_done_Results) DecodeFromPtr(p capnp.Ptr) StreamTest_done_Results { - return StreamTest_done_Results(capnp.Struct{}.DecodeFromPtr(p)) -} - -func (s StreamTest_done_Results) ToPtr() capnp.Ptr { - return capnp.Struct(s).ToPtr() -} -func (s StreamTest_done_Results) IsValid() bool { - return capnp.Struct(s).IsValid() -} - -func (s StreamTest_done_Results) Message() *capnp.Message { - return capnp.Struct(s).Message() -} - -func (s StreamTest_done_Results) Segment() *capnp.Segment { - return capnp.Struct(s).Segment() -} - -// StreamTest_done_Results_List is a list of StreamTest_done_Results. -type StreamTest_done_Results_List = capnp.StructList[StreamTest_done_Results] - -// NewStreamTest_done_Results creates a new list of StreamTest_done_Results. -func NewStreamTest_done_Results_List(s *capnp.Segment, sz int32) (StreamTest_done_Results_List, error) { - l, err := capnp.NewCompositeList(s, capnp.ObjectSize{DataSize: 0, PointerCount: 0}, sz) - return capnp.StructList[StreamTest_done_Results](l), err -} - -// StreamTest_done_Results_Future is a wrapper for a StreamTest_done_Results promised by a client call. -type StreamTest_done_Results_Future struct{ *capnp.Future } - -func (f StreamTest_done_Results_Future) Struct() (StreamTest_done_Results, error) { - p, err := f.Future.Ptr() - return StreamTest_done_Results(p.Struct()), err -} - type CapArgsTest capnp.Client // CapArgsTest_TypeID is the unique identifier for the type CapArgsTest. @@ -1542,61 +1361,55 @@ func (p PingPongProvider_pingPong_Results_Future) PingPong() PingPong { return PingPong(p.Future.Field(0, nil).Client()) } -const schema_ef12a34b9807e19c = "x\xda\x94\x94]H\x1cW\x14\xc7\xcf\x99\xb9\xd3\xd1\xb6" + - "\xc3\xf6\xee\xd8V\xfbe\x15\xed\x83\xb4R\x95RkK" + - "wmi\xa5\x16d\xc7\xb6\x0f\xfd\x802uGw\xed" + - "~\xb1\xb3[\xa1`m\x1e\x8c\xcfyP\x93\x90\x044" + - "1_\x90\x07\x13\x124\x12\x88\x92@BLH \x90" + - "\x07#\x89\x86@DP\x89/BH&\xdc\x99\xbd\xee" + - "\xec\xae\x1a\xf2\xb6\xec9s\xfe\xbfs\xce\xff\xdcO\xd7" + - "\xd1O\xea\x94U\x19\x04\xed\x0f\xe9\x15k44\x10n" + - "\xfb\xaf\xe8\x7f\xa0o#\x80\x842@\xc3\xbc\xf8\x0e\x02" + - "\xaa\x0fE\x1f\xe0\xb3\x8f\xaa\xe7\x86\x9e\xce\xf7k%\x88" + - "\x00\x84\x85%R\xc9\xc2\x0a\xf1\x01Z\x8b#\xa1\xd2\x07" + - "\xaf~\xbe\x1b\xe8[<\xfe\x09\xf1\"\x10\xeb\x8b\xfa\xa5" + - "\x9e\xea\x92s\x83@_\x13\xad\x03\x0b\xf2\xde\x1f\x0f{" + - "W\x01P}\x93,\xaa\x15,S}\x9f\xb4\xa8?\xb0" + - "_\x96\xf2\xe1\xda\x8cr\xf9\xc9\x90\x83a\x97\xa9c2" + - "\xc4\xaa\xa7\xd65i\xf6\xfba7`\x99CPa\x13" + - "\xcc\xbe\xd1\xbd\xa7\xef\xc6\xdf\x13\x05:\xcd\xe4\xbcS]" + - "\xfd\x8e\x0c\xa8\x83\xb6\x8eqd\x05\x7f?\xfe\xd5TA" + - "r/9\xad\xf6\xdb\xc9\xbb\xc8\x80z\xcbN\xfex\xfd" + - "\x83\xbe\xe9\xcf\x1e\xdd\x06\xfa\x1e\x87\x9a$\x7f1\xa8\x8a" + - "\xe2{\x87N\xdc\x1f\xbe\x03\xae\xb1\x8c\x11{j\xa7l" + - "\xa8\xde\xa9\xa6\xce\xdf\xbe\xfee\xd1\xd5\xcfu\x16'\xd6" + - "\xe4tc\xf9\xc9cg\x96]\x03;\xebD\xfe]\xd9" + - "\x98I]\"k\x05l\x07\xc9\xa8:f\xb3\x8d\x90\x16" + - "\xf5\xaa\xcd\xb6|s\"\xfaeK\xf8\xb1\xc3\xe6\x8ce" + - "\x9ct3\x82\x0b6\xc1\xdc\xbb\xc9+G\xef6n8" + - ":N\xc2\x02\xdb\x0c\xaaK\xc4\x07\xbfZ)\xc3L\xd5" + - "v\xe8\x091\x96h\xfaVO4'\xbb\xcc\x9f\x9d\xbf" + - "\"\x91\xaa\x80\x9e\xd4\xc5\xa8\xa9\x11\x91\x00\x10\x04\xa0J" + - "%\x80V$\xa2V\"\xa0\xdc\xa1'\xd0KD@\xf4" + - "\x02\xe6T\x0a\x84c]\x81x\xac\xab\xd6\xe8\x08\xc5\xdb" + - "\xd2\xd1\xaav\xc3L\xcb\x91TN)o\xb6\x14\xc6P" + - "\x02\x01\xa5\xbc2?\xa5\x92\x86\x1e\xb5y\x82\xf1\x98a" + - "\xf3D\xd1\xdc\xcc\x11\\R\x81d\xfc\x9f\xb0\x1c4\x92" + - "\x01D\x8d\x88\x92ks\xc8\xc7Di+\x08\xb4X\xb6" + - "\x12\x99\x8f\x00\xc0\x8f\x01\xc4\x9d\xc7\xc0\xe1\xb7\xcd2\x8d" + - "H\xe7\x96-\xd6d[\xf4\xb0$\xa4Y\xa3\x02\"u" + - "\xf5\x8b\xbc\xa6\xcf)\xca\xda(\xb2\xdb\xe0\xc7\x89\xfc<" + - "h]\x0d\x08\xb4Z\xc6\xac\xc3\x90\x1f\x08-c1E" + - "\xf60r\xbf\xa3\x9a\xdb\"\xf2\xc1\x96Gsu\xb8W" + - "06~\xb1\xa7a\xff\x9f\xfb\\:\xfc\xc0\x91\x1bw" + - "S'\x916C~\xf4\xb0\xfd\xe4\xea\x90\xfc\xe5\x04\x8d" + - "d-\x1f<[\xa5\xacG\xcd\x9d]\xb3\x85\xff^l" + - "\x9a\x82\xc5d\xaa\xec\xe8,\xb6\xbc\x88\xe8Z1r\x1e" + - "\x99\xe1oZ\x8a\x9f\x0f" + - "\x00\x00\xff\xff\x1f\xcb\xbd\x93" +const schema_ef12a34b9807e19c = "x\xda\x94TMH\x14o\x18\x7f\x9ey\xdf\xf9\x8f\xf2" + + "o\xd9\xde\x1d\xe9\x9b>D;H,\xa9\x04e\xd9j" + + "a\x0bF\xb2cz\xa8\x0e1\xe9\xa8k;\xeb2\xb3" + + "\x9b\x14\x94]\xc4S\x07\x0fiFt\xb0\x88:t(" + + "($/%\x05E\x12\x1d\x8a\x08\x93\xd2S\x11\x98\xd5" + + "E\x88\x9ax\xdf\xd9qg\xfd\x82\xae\xfb<\xf3\xfbz" + + "~\xef\xee|\x815\xb4<\x10Q@\xd2\x9a\xe4\xff\x9c" + + "\xe1\x8e\xbex\xc3\x85\x82\x8b\xc0\xd6\"\x80\x8c\x0a@e" + + "/\xd9\x80\x80\xea%\x12\x01\xfc\xb3\xbdtb\xe0\xf7d" + + "\xafV\x84\x08@\xf9\xf8.)\xe6\xe3\x07|\xec\xec\xa9" + + "\xf8\xd2]Z\xf4\xf02\xb0\xff\x89smJ\xb9r\xf8" + + "F\xe8\x1b\x00\xaao\xc9\xb4:E\x14\x00u\x92D\xd5" + + "B\xfe\xa5\x13\xd8:;\x16x\xf6k\xc0%\x13`\xdf" + + "9\x18u*\x98\xf3R\x1e?4\xe8\x97\xf1\xde\xe5\x99" + + "\x12<\xe3\xab;\xfb{^\x9d\x1eY\xc4\x83\xf4\x91\x8b" + + "\xae\xca\xb4Om\x16<\xc6\xcd\x19\x98\x98\xd5u&\xae\xb4\x1aV\x0cQ\xa3" + + "D\xf6e\x8f^\x04\x8c\xd5\x83\xc4\x0a\x15'\x95\xfd\x08" + + "\x00j0\x86\xb8\xb2EO\xd8\xb2[\xb6\x91h[R" + + "~YN~\x90/!\xcbU\x0d\x10\x99\xcf\x0bz\x98" + + "\x11\x17\x94\xdb(\x106\xbcG\x84^\xc1Yy\x19H" + + "\xacT\xc1\\G\xd0\xab8[\xcfg\x01%\xc8\x95\xd7" + + "\xb8\xac\xf9\x169\xd1\xd1\xb4e\xe8[L\x8f\xc7\x8d\xcb" + + "\xeb\x01&\xef=\xee\xae\xbczr\x881\x8e%+\xc1" + + "T\xc6\xee\xc8\x07\xa1\x0b\x93o5\xac\xb0\x97*\xef\x84" + + "\xa2\x9b\xf6\xca\xe7^\xa28+_{\xc9\xd4\xb3(y" + + "\xee\x04\x95\xc2\x95\xcd{\xf3\x1e\x1bB\xf6\x1f\x87\xb1\x03" + + "\xc2[OV\xce\xbf\xd8k4\xec`f\xc1\xa5\xeb\x01" + + "\xb4U\x04\xb5u\x12\xfa\xdb\x85,\xf7X\x17\x1c\x9cx" + + "w\x10g\x08\xf3\x8c\x85\x1b\x13\x97\xadP\xab\x9e\xd61" + + "\x00\x12\x06\x00\xff\x06\x00\x00\xff\xff\xfe*\x86\xac" func init() { schemas.Register(schema_ef12a34b9807e19c, 0x80087e4e698768a2, 0x85ddfd96db252600, - 0x86370ae31868a1e2, 0x95b6142577e93239, 0x96fbc50dc2f0200d, 0x9746cc05cbff1132, @@ -1605,7 +1418,6 @@ func init() { 0xd4e835c17f1ef32c, 0xd797e0a99edf0921, 0xe2553e5a663abb7d, - 0xebb3a7aa1f38c1b9, 0xf004c474c2f8ee7a, 0xf269473b6db8d0eb, 0xf838dca6c8721bdb) diff --git a/rpc/streaming_test.go b/rpc/streaming_test.go index 4ffc7ea1..2cfeaf6c 100644 --- a/rpc/streaming_test.go +++ b/rpc/streaming_test.go @@ -44,10 +44,3 @@ func (m *maxPushStream) Push(context.Context, testcapnp.StreamTest_push) error { } return nil } - -func (m *maxPushStream) Done(context.Context, testcapnp.StreamTest_done) error { - // Note: important to always return nil here, so the tests can distinguish - // between an error carried over from a call to push() and one returned - // directly from done(). - return nil -} From 65fab2a4f965b9239c68771f312ac14d5ef1cfc1 Mon Sep 17 00:00:00 2001 From: Ian Denhardt Date: Mon, 13 Feb 2023 18:14:51 -0500 Subject: [PATCH 08/11] Define WaitStreaming() on generated interface types ...so we don't have to cast to Client anymore --- capnpc-go/templates/interfaceClient | 4 ++++ flowcontrol/internal/test-tool/writer.capnp.go | 4 ++++ internal/aircraftlib/aircraft.capnp.go | 12 ++++++++++++ rpc/bench_test.go | 2 +- rpc/internal/testcapnp/test.capnp.go | 16 ++++++++++++++++ rpc/streaming_test.go | 5 ++--- std/capnp/persistent/persistent.capnp.go | 4 ++++ 7 files changed, 43 insertions(+), 4 deletions(-) diff --git a/capnpc-go/templates/interfaceClient b/capnpc-go/templates/interfaceClient index 279810f6..f76e1611 100644 --- a/capnpc-go/templates/interfaceClient +++ b/capnpc-go/templates/interfaceClient @@ -30,6 +30,10 @@ func (c {{$.Node.Name}}) {{.Name|title}}(ctx {{$.G.Imports.Context}}.Context, pa {{end}} +func (c {{$.Node.Name}}) WaitStreaming() error { + return capnp.Client(c).WaitStreaming() +} + // String returns a string that identifies this capability for debugging // purposes. Its format should not be depended on: in particular, it // should not be used to compare clients. Use IsSame to compare clients diff --git a/flowcontrol/internal/test-tool/writer.capnp.go b/flowcontrol/internal/test-tool/writer.capnp.go index 56067848..7094056a 100644 --- a/flowcontrol/internal/test-tool/writer.capnp.go +++ b/flowcontrol/internal/test-tool/writer.capnp.go @@ -37,6 +37,10 @@ func (c Writer) Write(ctx context.Context, params func(Writer_write_Params) erro } +func (c Writer) WaitStreaming() error { + return capnp.Client(c).WaitStreaming() +} + // String returns a string that identifies this capability for debugging // purposes. Its format should not be depended on: in particular, it // should not be used to compare clients. Use IsSame to compare clients diff --git a/internal/aircraftlib/aircraft.capnp.go b/internal/aircraftlib/aircraft.capnp.go index 5ef218c7..35e87cb2 100644 --- a/internal/aircraftlib/aircraft.capnp.go +++ b/internal/aircraftlib/aircraft.capnp.go @@ -5073,6 +5073,10 @@ func (c Echo) Echo(ctx context.Context, params func(Echo_echo_Params) error) (Ec } +func (c Echo) WaitStreaming() error { + return capnp.Client(c).WaitStreaming() +} + // String returns a string that identifies this capability for debugging // purposes. Its format should not be depended on: in particular, it // should not be used to compare clients. Use IsSame to compare clients @@ -5863,6 +5867,10 @@ func (c CallSequence) GetNumber(ctx context.Context, params func(CallSequence_ge } +func (c CallSequence) WaitStreaming() error { + return capnp.Client(c).WaitStreaming() +} + // String returns a string that identifies this capability for debugging // purposes. Its format should not be depended on: in particular, it // should not be used to compare clients. Use IsSame to compare clients @@ -6177,6 +6185,10 @@ func (c Pipeliner) GetNumber(ctx context.Context, params func(CallSequence_getNu } +func (c Pipeliner) WaitStreaming() error { + return capnp.Client(c).WaitStreaming() +} + // String returns a string that identifies this capability for debugging // purposes. Its format should not be depended on: in particular, it // should not be used to compare clients. Use IsSame to compare clients diff --git a/rpc/bench_test.go b/rpc/bench_test.go index ce664d01..8db08ae6 100644 --- a/rpc/bench_test.go +++ b/rpc/bench_test.go @@ -59,7 +59,7 @@ func benchmarkStreaming(b *testing.B, cfg *benchmarkStreamingConfig) { } } } - if err := capnp.Client(bootstrap).WaitStreaming(); err != nil { + if err := bootstrap.WaitStreaming(); err != nil { b.Errorf("Error waiting on streaming calls: %v", err) } } diff --git a/rpc/internal/testcapnp/test.capnp.go b/rpc/internal/testcapnp/test.capnp.go index 1f8d3f48..e68dcdce 100644 --- a/rpc/internal/testcapnp/test.capnp.go +++ b/rpc/internal/testcapnp/test.capnp.go @@ -38,6 +38,10 @@ func (c PingPong) EchoNum(ctx context.Context, params func(PingPong_echoNum_Para } +func (c PingPong) WaitStreaming() error { + return capnp.Client(c).WaitStreaming() +} + // String returns a string that identifies this capability for debugging // purposes. Its format should not be depended on: in particular, it // should not be used to compare clients. Use IsSame to compare clients @@ -337,6 +341,10 @@ func (c StreamTest) Push(ctx context.Context, params func(StreamTest_push_Params } +func (c StreamTest) WaitStreaming() error { + return capnp.Client(c).WaitStreaming() +} + // String returns a string that identifies this capability for debugging // purposes. Its format should not be depended on: in particular, it // should not be used to compare clients. Use IsSame to compare clients @@ -591,6 +599,10 @@ func (c CapArgsTest) Self(ctx context.Context, params func(CapArgsTest_self_Para } +func (c CapArgsTest) WaitStreaming() error { + return capnp.Client(c).WaitStreaming() +} + // String returns a string that identifies this capability for debugging // purposes. Its format should not be depended on: in particular, it // should not be used to compare clients. Use IsSame to compare clients @@ -1079,6 +1091,10 @@ func (c PingPongProvider) PingPong(ctx context.Context, params func(PingPongProv } +func (c PingPongProvider) WaitStreaming() error { + return capnp.Client(c).WaitStreaming() +} + // String returns a string that identifies this capability for debugging // purposes. Its format should not be depended on: in particular, it // should not be used to compare clients. Use IsSame to compare clients diff --git a/rpc/streaming_test.go b/rpc/streaming_test.go index 2cfeaf6c..a0d3b812 100644 --- a/rpc/streaming_test.go +++ b/rpc/streaming_test.go @@ -5,7 +5,6 @@ import ( "fmt" "testing" - "capnproto.org/go/capnp/v3" "capnproto.org/go/capnp/v3/rpc/internal/testcapnp" "github.com/stretchr/testify/assert" ) @@ -17,7 +16,7 @@ func TestStreamingWaitOk(t *testing.T) { client := testcapnp.StreamTest_ServerToClient(&maxPushStream{limit: 1}) defer client.Release() assert.NoError(t, client.Push(ctx, nil)) - assert.NoError(t, capnp.Client(client).WaitStreaming()) + assert.NoError(t, client.WaitStreaming()) } // TestStreamingWaitErr verifies that if an error occurs in a streaming call, @@ -27,7 +26,7 @@ func TestStreamingWaitErr(t *testing.T) { client := testcapnp.StreamTest_ServerToClient(&maxPushStream{limit: 0}) defer client.Release() assert.NoError(t, client.Push(ctx, nil)) - assert.NotNil(t, capnp.Client(client).WaitStreaming()) + assert.NotNil(t, client.WaitStreaming()) } // A maxPushStream is an implementation of StreamTest that diff --git a/std/capnp/persistent/persistent.capnp.go b/std/capnp/persistent/persistent.capnp.go index 0ba886a8..9a7ab855 100644 --- a/std/capnp/persistent/persistent.capnp.go +++ b/std/capnp/persistent/persistent.capnp.go @@ -39,6 +39,10 @@ func (c Persistent) Save(ctx context.Context, params func(Persistent_SaveParams) } +func (c Persistent) WaitStreaming() error { + return capnp.Client(c).WaitStreaming() +} + // String returns a string that identifies this capability for debugging // purposes. Its format should not be depended on: in particular, it // should not be used to compare clients. Use IsSame to compare clients From ebb8c33226660e59107337aaeec51088b3dbf0d6 Mon Sep 17 00:00:00 2001 From: Ian Denhardt Date: Mon, 13 Feb 2023 18:47:18 -0500 Subject: [PATCH 09/11] Add stuff about streaming to the docs. --- ...Remote-Procedure-Calls-using-Interfaces.md | 92 +++++++++++++++++++ 1 file changed, 92 insertions(+) diff --git a/docs/Remote-Procedure-Calls-using-Interfaces.md b/docs/Remote-Procedure-Calls-using-Interfaces.md index 7b1c15b3..112c6976 100644 --- a/docs/Remote-Procedure-Calls-using-Interfaces.md +++ b/docs/Remote-Procedure-Calls-using-Interfaces.md @@ -238,6 +238,98 @@ It isn't until the `Struct()` method is called on a method that the `client` fun A few additional words on the Future type are in order. If your RPC method returns another interface type, you can use the Future to immediately make calls against that as-of-yet-unreturned interface. This relies on a feature of the Cap'n Proto RPC protocol called [promise pipelining][pipelining], the advantage of which is that Cap'n Proto can often optimize away the additional network round-trips when such method calls are chained. This is one of Cap'n Proto's key advantages, which we will use heavily in the next chapter. +## Streaming and Backpressure + +Cap'n Proto supports streaming workflows. Unlike other RPC protocols +such as grpc, this can be done without any dedicated "streaming" +construct. Instead, you can define an interface such as: + +```capnp +interface ByteStream { + write @0 (data :Data); + done @1 (); +} +``` + +The above is roughly analogous to the `io.WriteCloser` interface. If you +have a `ByteStream` interface, you can write your data into it in +chunks, and then call done() to signal that all data has been +written. + +There are however two wrinkles. + +The first is flow control. If you naively call write() in a loop, Cap'n +Proto will not by default provide any backpressure, resulting in excess +memory usage and high latency. But waiting for each call in turn results +in low throughput. To solve this, you need to attach a flow limiter, +from the `flowcontrol` package. You can do this with `SetFlowLimiter` on +any capability: + +```go +import "capnproto.org/go/capnp/v3/flowcontrol" + +// ... + +// Limits in-flight data to 2^16 bytes = 64KiB: +client.SetFlowLimiter(flowcontrol.NewFixedLimiter(1 << 16)) +``` + +If too much data is already in-flight, This will cause future rpc calls +to block until some existing ones have returned. + +The second wrinkle is dealing with return values: even though +`ByteStream.write()` has no return value, each call will return a future +and ReleaseFunc which must be waited on at some point. You could +accumulate these in a slice and wait on each of them at the end, but +there is a better option: the schema language supports a special +`stream` return type: + +```capnp +interface ByteStream { + write @0 (data :Data) -> stream; + done @1 (); +} +``` + +If the return type of a method is `stream`, instead of returning +a future and `ReleaseFunc`, the generated method will just return an +error: + +```diff +- func (c ByteStream) Write(ctx context.Context, params func(ByteStream_write_Params) error) (stream.ByteStream_write_Results_Future, capnp.ReleaseFunc) { ++ func (c ByteStream) Write(ctx context.Context, params func(ByteStream_write_Params) error) error { +``` + +The implementation will take care of waiting on the Future. If the +error is non-nil, it means some prior streaming call failed; this +can be useful for short-circuiting long streaming workflows. + +Additionally, each client type has a `WaitStreaming` method, which should +be called at the end of a streaming workflow. A full example might look +like: + +```go +for chunk := range chunks { + err := client.Write(ctx, func(p ByteStream_write_Params) error { + return p.SetData(chunk) + }) + if err != nil { + return err + } +} + +future, release := client.Done(ctx, nil) +defer release() +_, err := future.Struct() +if err != nil { + return err +} + +if err := client.WaitStreaming(); err != nil { + return err +} +``` + [rpc]: https://capnproto.org/rpc.html [pipelining]: https://capnproto.org/news/2013-12-13-promise-pipelining-capnproto-vs-ice.html From 45782212f795ad8698313caf35af6c76b8163051 Mon Sep 17 00:00:00 2001 From: Ian Denhardt Date: Mon, 13 Feb 2023 19:21:07 -0500 Subject: [PATCH 10/11] Get rid of lazy init for Client.streamWg --- capability.go | 51 +++++++++++++++++++-------------------------------- 1 file changed, 19 insertions(+), 32 deletions(-) diff --git a/capability.go b/capability.go index 980342be..fb5b9d18 100644 --- a/capability.go +++ b/capability.go @@ -123,12 +123,15 @@ type client struct { creatorStack string creatorLine int - mu sync.Mutex // protects the struct - limiter flowcontrol.FlowLimiter - streamError error // Last error from streaming calls. - streamWg *sync.WaitGroup - h *clientHook // nil if resolved to nil or released - released bool + mu sync.Mutex // protects the struct + limiter flowcontrol.FlowLimiter + h *clientHook // nil if resolved to nil or released + released bool + + stream struct { + err error // Last error from streaming calls. + wg sync.WaitGroup // Outstanding calls. + } } // clientHook is a reference-counted wrapper for a ClientHook. @@ -324,8 +327,8 @@ func (c Client) SendCall(ctx context.Context, s Send) (*Answer, ReleaseFunc) { return ErrorAnswer(s.Method, errors.New("call on null client")), func() {} } - if c.streamError != nil { - return ErrorAnswer(s.Method, exc.WrapError("stream error", c.streamError)), func() {} + if c.stream.err != nil { + return ErrorAnswer(s.Method, exc.WrapError("stream error", c.stream.err)), func() {} } limiter := c.GetFlowLimiter() @@ -395,18 +398,11 @@ func (c Client) SendCall(ctx context.Context, s Send) (*Answer, ReleaseFunc) { // client will return the same error (without starting // the method or calling PlaceArgs). func (c Client) SendStreamCall(ctx context.Context, s Send) error { - var ( - streamError error - wg *sync.WaitGroup - ) + var streamError error syncutil.With(&c.mu, func() { - streamError = c.streamError + streamError = c.stream.err if streamError == nil { - if c.streamWg == nil { - c.streamWg = &sync.WaitGroup{} - } - wg = c.streamWg - wg.Add(1) + c.stream.wg.Add(1) } }) if streamError != nil { @@ -414,12 +410,12 @@ func (c Client) SendStreamCall(ctx context.Context, s Send) error { } ans, release := c.SendCall(ctx, s) go func() { - defer wg.Done() + defer c.stream.wg.Done() _, err := ans.Future().Ptr() release() if err != nil { syncutil.With(&c.mu, func() { - c.streamError = err + c.stream.err = err }) } }() @@ -430,19 +426,10 @@ func (c Client) SendStreamCall(ctx context.Context, s Send) error { // started with SendStreamCall) to complete, and then returns an error // if any streaming call has failed. func (c Client) WaitStreaming() error { - var ( - ret error - wg *sync.WaitGroup - ) - syncutil.With(&c.mu, func() { - wg = c.streamWg - }) - if wg == nil { - return nil - } - wg.Wait() + c.stream.wg.Wait() + var ret error syncutil.With(&c.mu, func() { - ret = c.streamError + ret = c.stream.err }) return ret } From d2e85b5806bf24c182a243c40732166960bf58f5 Mon Sep 17 00:00:00 2001 From: Ian Denhardt Date: Mon, 13 Feb 2023 19:32:28 -0500 Subject: [PATCH 11/11] Hold lock while accessing c.stream.err For some reason I'd been under the impression that startCall acquired this and released it in finish(), but that's not the case. --- capability.go | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/capability.go b/capability.go index fb5b9d18..1708ce4c 100644 --- a/capability.go +++ b/capability.go @@ -327,8 +327,13 @@ func (c Client) SendCall(ctx context.Context, s Send) (*Answer, ReleaseFunc) { return ErrorAnswer(s.Method, errors.New("call on null client")), func() {} } - if c.stream.err != nil { - return ErrorAnswer(s.Method, exc.WrapError("stream error", c.stream.err)), func() {} + var err error + syncutil.With(&c.mu, func() { + err = c.stream.err + }) + + if err != nil { + return ErrorAnswer(s.Method, exc.WrapError("stream error", err)), func() {} } limiter := c.GetFlowLimiter()