diff --git a/capability.go b/capability.go index 85cbf280..1708ce4c 100644 --- a/capability.go +++ b/capability.go @@ -8,6 +8,7 @@ import ( "strconv" "sync" + "capnproto.org/go/capnp/v3/exc" "capnproto.org/go/capnp/v3/exp/bufferpool" "capnproto.org/go/capnp/v3/flowcontrol" "capnproto.org/go/capnp/v3/internal/str" @@ -126,6 +127,11 @@ type client struct { limiter flowcontrol.FlowLimiter h *clientHook // nil if resolved to nil or released released bool + + stream struct { + err error // Last error from streaming calls. + wg sync.WaitGroup // Outstanding calls. + } } // clientHook is a reference-counted wrapper for a ClientHook. @@ -321,6 +327,15 @@ func (c Client) SendCall(ctx context.Context, s Send) (*Answer, ReleaseFunc) { return ErrorAnswer(s.Method, errors.New("call on null client")), func() {} } + var err error + syncutil.With(&c.mu, func() { + err = c.stream.err + }) + + if err != nil { + return ErrorAnswer(s.Method, exc.WrapError("stream error", err)), func() {} + } + limiter := c.GetFlowLimiter() // We need to call PlaceArgs before we will know the size of message for @@ -381,6 +396,49 @@ func (c Client) SendCall(ctx context.Context, s Send) (*Answer, ReleaseFunc) { return ans, rel } +// SendStreamCall is like SendCall except that: +// +// 1. It does not return an answer for the eventual result. +// 2. If the call returns an error, all future calls on this +// client will return the same error (without starting +// the method or calling PlaceArgs). +func (c Client) SendStreamCall(ctx context.Context, s Send) error { + var streamError error + syncutil.With(&c.mu, func() { + streamError = c.stream.err + if streamError == nil { + c.stream.wg.Add(1) + } + }) + if streamError != nil { + return streamError + } + ans, release := c.SendCall(ctx, s) + go func() { + defer c.stream.wg.Done() + _, err := ans.Future().Ptr() + release() + if err != nil { + syncutil.With(&c.mu, func() { + c.stream.err = err + }) + } + }() + return nil +} + +// WaitStreaming waits for all outstanding streaming calls (i.e. calls +// started with SendStreamCall) to complete, and then returns an error +// if any streaming call has failed. +func (c Client) WaitStreaming() error { + c.stream.wg.Wait() + var ret error + syncutil.With(&c.mu, func() { + ret = c.stream.err + }) + return ret +} + // RecvCall starts executing a method with the referenced arguments // and returns an answer that will hold the result. The hook will call // a.Release when it no longer needs to reference the parameters. The diff --git a/capnpc-go/nodes.go b/capnpc-go/nodes.go index e68c4b8a..8a71a2ee 100644 --- a/capnpc-go/nodes.go +++ b/capnpc-go/nodes.go @@ -7,6 +7,7 @@ import ( "capnproto.org/go/capnp/v3" "capnproto.org/go/capnp/v3/internal/schema" + "capnproto.org/go/capnp/v3/std/capnp/stream" ) type node struct { @@ -100,6 +101,10 @@ type interfaceMethod struct { Results *node } +func (m interfaceMethod) IsStreaming() bool { + return m.Results.Id() == stream.StreamResult_TypeID +} + func methodSet(methods []interfaceMethod, n *node, nodes nodeMap) ([]interfaceMethod, error) { ms, _ := n.Interface().Methods() for i := 0; i < ms.Len(); i++ { diff --git a/capnpc-go/templates/interfaceClient b/capnpc-go/templates/interfaceClient index 8d419958..f76e1611 100644 --- a/capnpc-go/templates/interfaceClient +++ b/capnpc-go/templates/interfaceClient @@ -6,7 +6,11 @@ type {{.Node.Name}} capnp.Client {{ template "_typeid" .Node }} {{range .Methods -}} -func (c {{$.Node.Name}}) {{.Name|title}}(ctx {{$.G.Imports.Context}}.Context, params func({{$.G.RemoteNodeName .Params $.Node}}) error) ({{$.G.RemoteNodeName .Results $.Node}}_Future, capnp.ReleaseFunc) { + +func (c {{$.Node.Name}}) {{.Name|title}}(ctx {{$.G.Imports.Context}}.Context, params func({{$.G.RemoteNodeName .Params $.Node}}) error) +{{- if .IsStreaming }} error { +{{- else }} ({{$.G.RemoteNodeName .Results $.Node}}_Future, capnp.ReleaseFunc) { +{{ end }} s := capnp.Send{ Method: capnp.Method{ {{template "_interfaceMethod" .}} @@ -16,11 +20,20 @@ func (c {{$.Node.Name}}) {{.Name|title}}(ctx {{$.G.Imports.Context}}.Context, pa s.ArgsSize = {{$.G.ObjectSize .Params}} s.PlaceArgs = func(s capnp.Struct) error { return params({{$.G.RemoteNodeName .Params $.Node}}(s)) } } +{{ if .IsStreaming }} + return capnp.Client(c).SendStreamCall(ctx, s) +{{ else }} ans, release := capnp.Client(c).SendCall(ctx, s) return {{$.G.RemoteNodeName .Results $.Node}}_Future{Future: ans.Future()}, release +{{ end }} } + {{end}} +func (c {{$.Node.Name}}) WaitStreaming() error { + return capnp.Client(c).WaitStreaming() +} + // String returns a string that identifies this capability for debugging // purposes. Its format should not be depended on: in particular, it // should not be used to compare clients. Use IsSame to compare clients @@ -86,4 +99,4 @@ func (c {{$.Node.Name}}) SetFlowLimiter(lim {{.G.Imports.FlowControl}}.FlowLimit // for this client. func (c {{$.Node.Name}}) GetFlowLimiter() {{.G.Imports.FlowControl}}.FlowLimiter { return capnp.Client(c).GetFlowLimiter() -} \ No newline at end of file +} diff --git a/docs/Remote-Procedure-Calls-using-Interfaces.md b/docs/Remote-Procedure-Calls-using-Interfaces.md index 7b1c15b3..112c6976 100644 --- a/docs/Remote-Procedure-Calls-using-Interfaces.md +++ b/docs/Remote-Procedure-Calls-using-Interfaces.md @@ -238,6 +238,98 @@ It isn't until the `Struct()` method is called on a method that the `client` fun A few additional words on the Future type are in order. If your RPC method returns another interface type, you can use the Future to immediately make calls against that as-of-yet-unreturned interface. This relies on a feature of the Cap'n Proto RPC protocol called [promise pipelining][pipelining], the advantage of which is that Cap'n Proto can often optimize away the additional network round-trips when such method calls are chained. This is one of Cap'n Proto's key advantages, which we will use heavily in the next chapter. +## Streaming and Backpressure + +Cap'n Proto supports streaming workflows. Unlike other RPC protocols +such as grpc, this can be done without any dedicated "streaming" +construct. Instead, you can define an interface such as: + +```capnp +interface ByteStream { + write @0 (data :Data); + done @1 (); +} +``` + +The above is roughly analogous to the `io.WriteCloser` interface. If you +have a `ByteStream` interface, you can write your data into it in +chunks, and then call done() to signal that all data has been +written. + +There are however two wrinkles. + +The first is flow control. If you naively call write() in a loop, Cap'n +Proto will not by default provide any backpressure, resulting in excess +memory usage and high latency. But waiting for each call in turn results +in low throughput. To solve this, you need to attach a flow limiter, +from the `flowcontrol` package. You can do this with `SetFlowLimiter` on +any capability: + +```go +import "capnproto.org/go/capnp/v3/flowcontrol" + +// ... + +// Limits in-flight data to 2^16 bytes = 64KiB: +client.SetFlowLimiter(flowcontrol.NewFixedLimiter(1 << 16)) +``` + +If too much data is already in-flight, This will cause future rpc calls +to block until some existing ones have returned. + +The second wrinkle is dealing with return values: even though +`ByteStream.write()` has no return value, each call will return a future +and ReleaseFunc which must be waited on at some point. You could +accumulate these in a slice and wait on each of them at the end, but +there is a better option: the schema language supports a special +`stream` return type: + +```capnp +interface ByteStream { + write @0 (data :Data) -> stream; + done @1 (); +} +``` + +If the return type of a method is `stream`, instead of returning +a future and `ReleaseFunc`, the generated method will just return an +error: + +```diff +- func (c ByteStream) Write(ctx context.Context, params func(ByteStream_write_Params) error) (stream.ByteStream_write_Results_Future, capnp.ReleaseFunc) { ++ func (c ByteStream) Write(ctx context.Context, params func(ByteStream_write_Params) error) error { +``` + +The implementation will take care of waiting on the Future. If the +error is non-nil, it means some prior streaming call failed; this +can be useful for short-circuiting long streaming workflows. + +Additionally, each client type has a `WaitStreaming` method, which should +be called at the end of a streaming workflow. A full example might look +like: + +```go +for chunk := range chunks { + err := client.Write(ctx, func(p ByteStream_write_Params) error { + return p.SetData(chunk) + }) + if err != nil { + return err + } +} + +future, release := client.Done(ctx, nil) +defer release() +_, err := future.Struct() +if err != nil { + return err +} + +if err := client.WaitStreaming(); err != nil { + return err +} +``` + [rpc]: https://capnproto.org/rpc.html [pipelining]: https://capnproto.org/news/2013-12-13-promise-pipelining-capnproto-vs-ice.html diff --git a/flowcontrol/internal/test-tool/writer.capnp.go b/flowcontrol/internal/test-tool/writer.capnp.go index 471cad5c..7094056a 100644 --- a/flowcontrol/internal/test-tool/writer.capnp.go +++ b/flowcontrol/internal/test-tool/writer.capnp.go @@ -18,6 +18,7 @@ type Writer capnp.Client const Writer_TypeID = 0xf82e58b4a78f136b func (c Writer) Write(ctx context.Context, params func(Writer_write_Params) error) (Writer_write_Results_Future, capnp.ReleaseFunc) { + s := capnp.Send{ Method: capnp.Method{ InterfaceID: 0xf82e58b4a78f136b, @@ -30,8 +31,14 @@ func (c Writer) Write(ctx context.Context, params func(Writer_write_Params) erro s.ArgsSize = capnp.ObjectSize{DataSize: 0, PointerCount: 1} s.PlaceArgs = func(s capnp.Struct) error { return params(Writer_write_Params(s)) } } + ans, release := capnp.Client(c).SendCall(ctx, s) return Writer_write_Results_Future{Future: ans.Future()}, release + +} + +func (c Writer) WaitStreaming() error { + return capnp.Client(c).WaitStreaming() } // String returns a string that identifies this capability for debugging @@ -99,7 +106,9 @@ func (c Writer) SetFlowLimiter(lim fc.FlowLimiter) { // for this client. func (c Writer) GetFlowLimiter() fc.FlowLimiter { return capnp.Client(c).GetFlowLimiter() -} // A Writer_Server is a Writer with a local implementation. +} + +// A Writer_Server is a Writer with a local implementation. type Writer_Server interface { Write(context.Context, Writer_write) error } @@ -236,9 +245,9 @@ func NewWriter_write_Params_List(s *capnp.Segment, sz int32) (Writer_write_Param // Writer_write_Params_Future is a wrapper for a Writer_write_Params promised by a client call. type Writer_write_Params_Future struct{ *capnp.Future } -func (p Writer_write_Params_Future) Struct() (Writer_write_Params, error) { - s, err := p.Future.Struct() - return Writer_write_Params(s), err +func (f Writer_write_Params_Future) Struct() (Writer_write_Params, error) { + p, err := f.Future.Ptr() + return Writer_write_Params(p.Struct()), err } type Writer_write_Results capnp.Struct @@ -301,9 +310,9 @@ func NewWriter_write_Results_List(s *capnp.Segment, sz int32) (Writer_write_Resu // Writer_write_Results_Future is a wrapper for a Writer_write_Results promised by a client call. type Writer_write_Results_Future struct{ *capnp.Future } -func (p Writer_write_Results_Future) Struct() (Writer_write_Results, error) { - s, err := p.Future.Struct() - return Writer_write_Results(s), err +func (f Writer_write_Results_Future) Struct() (Writer_write_Results, error) { + p, err := f.Future.Ptr() + return Writer_write_Results(p.Struct()), err } const schema_aca73f831c7ebfdd = "x\xda\x12\xa8u`1\xe4\xcdgb`\x0a\x94ae" + diff --git a/internal/aircraftlib/aircraft.capnp.go b/internal/aircraftlib/aircraft.capnp.go index 9b2baa09..35e87cb2 100644 --- a/internal/aircraftlib/aircraft.capnp.go +++ b/internal/aircraftlib/aircraft.capnp.go @@ -5054,6 +5054,7 @@ type Echo capnp.Client const Echo_TypeID = 0x8e5322c1e9282534 func (c Echo) Echo(ctx context.Context, params func(Echo_echo_Params) error) (Echo_echo_Results_Future, capnp.ReleaseFunc) { + s := capnp.Send{ Method: capnp.Method{ InterfaceID: 0x8e5322c1e9282534, @@ -5066,8 +5067,14 @@ func (c Echo) Echo(ctx context.Context, params func(Echo_echo_Params) error) (Ec s.ArgsSize = capnp.ObjectSize{DataSize: 0, PointerCount: 1} s.PlaceArgs = func(s capnp.Struct) error { return params(Echo_echo_Params(s)) } } + ans, release := capnp.Client(c).SendCall(ctx, s) return Echo_echo_Results_Future{Future: ans.Future()}, release + +} + +func (c Echo) WaitStreaming() error { + return capnp.Client(c).WaitStreaming() } // String returns a string that identifies this capability for debugging @@ -5135,7 +5142,9 @@ func (c Echo) SetFlowLimiter(lim fc.FlowLimiter) { // for this client. func (c Echo) GetFlowLimiter() fc.FlowLimiter { return capnp.Client(c).GetFlowLimiter() -} // A Echo_Server is a Echo with a local implementation. +} + +// A Echo_Server is a Echo with a local implementation. type Echo_Server interface { Echo(context.Context, Echo_echo) error } @@ -5839,6 +5848,7 @@ type CallSequence capnp.Client const CallSequence_TypeID = 0xabaedf5f7817c820 func (c CallSequence) GetNumber(ctx context.Context, params func(CallSequence_getNumber_Params) error) (CallSequence_getNumber_Results_Future, capnp.ReleaseFunc) { + s := capnp.Send{ Method: capnp.Method{ InterfaceID: 0xabaedf5f7817c820, @@ -5851,8 +5861,14 @@ func (c CallSequence) GetNumber(ctx context.Context, params func(CallSequence_ge s.ArgsSize = capnp.ObjectSize{DataSize: 0, PointerCount: 0} s.PlaceArgs = func(s capnp.Struct) error { return params(CallSequence_getNumber_Params(s)) } } + ans, release := capnp.Client(c).SendCall(ctx, s) return CallSequence_getNumber_Results_Future{Future: ans.Future()}, release + +} + +func (c CallSequence) WaitStreaming() error { + return capnp.Client(c).WaitStreaming() } // String returns a string that identifies this capability for debugging @@ -5920,7 +5936,9 @@ func (c CallSequence) SetFlowLimiter(lim fc.FlowLimiter) { // for this client. func (c CallSequence) GetFlowLimiter() fc.FlowLimiter { return capnp.Client(c).GetFlowLimiter() -} // A CallSequence_Server is a CallSequence with a local implementation. +} + +// A CallSequence_Server is a CallSequence with a local implementation. type CallSequence_Server interface { GetNumber(context.Context, CallSequence_getNumber) error } @@ -6128,6 +6146,7 @@ type Pipeliner capnp.Client const Pipeliner_TypeID = 0xd6514008f0f84ebc func (c Pipeliner) NewPipeliner(ctx context.Context, params func(Pipeliner_newPipeliner_Params) error) (Pipeliner_newPipeliner_Results_Future, capnp.ReleaseFunc) { + s := capnp.Send{ Method: capnp.Method{ InterfaceID: 0xd6514008f0f84ebc, @@ -6140,10 +6159,14 @@ func (c Pipeliner) NewPipeliner(ctx context.Context, params func(Pipeliner_newPi s.ArgsSize = capnp.ObjectSize{DataSize: 0, PointerCount: 0} s.PlaceArgs = func(s capnp.Struct) error { return params(Pipeliner_newPipeliner_Params(s)) } } + ans, release := capnp.Client(c).SendCall(ctx, s) return Pipeliner_newPipeliner_Results_Future{Future: ans.Future()}, release + } + func (c Pipeliner) GetNumber(ctx context.Context, params func(CallSequence_getNumber_Params) error) (CallSequence_getNumber_Results_Future, capnp.ReleaseFunc) { + s := capnp.Send{ Method: capnp.Method{ InterfaceID: 0xabaedf5f7817c820, @@ -6156,8 +6179,14 @@ func (c Pipeliner) GetNumber(ctx context.Context, params func(CallSequence_getNu s.ArgsSize = capnp.ObjectSize{DataSize: 0, PointerCount: 0} s.PlaceArgs = func(s capnp.Struct) error { return params(CallSequence_getNumber_Params(s)) } } + ans, release := capnp.Client(c).SendCall(ctx, s) return CallSequence_getNumber_Results_Future{Future: ans.Future()}, release + +} + +func (c Pipeliner) WaitStreaming() error { + return capnp.Client(c).WaitStreaming() } // String returns a string that identifies this capability for debugging @@ -6225,7 +6254,9 @@ func (c Pipeliner) SetFlowLimiter(lim fc.FlowLimiter) { // for this client. func (c Pipeliner) GetFlowLimiter() fc.FlowLimiter { return capnp.Client(c).GetFlowLimiter() -} // A Pipeliner_Server is a Pipeliner with a local implementation. +} + +// A Pipeliner_Server is a Pipeliner with a local implementation. type Pipeliner_Server interface { NewPipeliner(context.Context, Pipeliner_newPipeliner) error diff --git a/rpc/bench_test.go b/rpc/bench_test.go index 9eceb1e5..8db08ae6 100644 --- a/rpc/bench_test.go +++ b/rpc/bench_test.go @@ -10,7 +10,6 @@ import ( "capnproto.org/go/capnp/v3/flowcontrol" "capnproto.org/go/capnp/v3/rpc" testcp "capnproto.org/go/capnp/v3/rpc/internal/testcapnp" - "capnproto.org/go/capnp/v3/std/capnp/stream" ) type benchmarkStreamingConfig struct { @@ -47,30 +46,21 @@ func benchmarkStreaming(b *testing.B, cfg *benchmarkStreamingConfig) { defer conn2.Close() bootstrap := testcp.StreamTest(conn2.Bootstrap(ctx)) defer bootstrap.Release() - var ( - futures []stream.StreamResult_Future - releaseFuncs []capnp.ReleaseFunc - ) bootstrap.SetFlowLimiter(flowcontrol.NewFixedLimiter(cfg.FlowLimit)) data := make([]byte, cfg.MessageSize) b.ResetTimer() for i := 0; i < b.N; i++ { for j := 0; j < cfg.MessageCount; j++ { - fut, rel := bootstrap.Push(ctx, func(p testcp.StreamTest_push_Params) error { + err := bootstrap.Push(ctx, func(p testcp.StreamTest_push_Params) error { return p.SetData(data) }) - futures = append(futures, fut) - releaseFuncs = append(releaseFuncs, rel) + if err != nil { + b.Fatalf("Streaming call #%v failed: %v", j, err) + } } } - for i, fut := range futures { - _, err := fut.Struct() - if err != nil { - b.Errorf("Error waiting on future #%v: %v", i, err) - } - } - for _, rel := range releaseFuncs { - rel() + if err := bootstrap.WaitStreaming(); err != nil { + b.Errorf("Error waiting on streaming calls: %v", err) } } diff --git a/rpc/internal/testcapnp/test.capnp.go b/rpc/internal/testcapnp/test.capnp.go index 03dc42f3..e68dcdce 100644 --- a/rpc/internal/testcapnp/test.capnp.go +++ b/rpc/internal/testcapnp/test.capnp.go @@ -19,6 +19,7 @@ type PingPong capnp.Client const PingPong_TypeID = 0xf004c474c2f8ee7a func (c PingPong) EchoNum(ctx context.Context, params func(PingPong_echoNum_Params) error) (PingPong_echoNum_Results_Future, capnp.ReleaseFunc) { + s := capnp.Send{ Method: capnp.Method{ InterfaceID: 0xf004c474c2f8ee7a, @@ -31,8 +32,14 @@ func (c PingPong) EchoNum(ctx context.Context, params func(PingPong_echoNum_Para s.ArgsSize = capnp.ObjectSize{DataSize: 8, PointerCount: 0} s.PlaceArgs = func(s capnp.Struct) error { return params(PingPong_echoNum_Params(s)) } } + ans, release := capnp.Client(c).SendCall(ctx, s) return PingPong_echoNum_Results_Future{Future: ans.Future()}, release + +} + +func (c PingPong) WaitStreaming() error { + return capnp.Client(c).WaitStreaming() } // String returns a string that identifies this capability for debugging @@ -100,7 +107,9 @@ func (c PingPong) SetFlowLimiter(lim fc.FlowLimiter) { // for this client. func (c PingPong) GetFlowLimiter() fc.FlowLimiter { return capnp.Client(c).GetFlowLimiter() -} // A PingPong_Server is a PingPong with a local implementation. +} + +// A PingPong_Server is a PingPong with a local implementation. type PingPong_Server interface { EchoNum(context.Context, PingPong_echoNum) error } @@ -314,7 +323,7 @@ type StreamTest capnp.Client // StreamTest_TypeID is the unique identifier for the type StreamTest. const StreamTest_TypeID = 0xbb3ca85b01eea465 -func (c StreamTest) Push(ctx context.Context, params func(StreamTest_push_Params) error) (stream.StreamResult_Future, capnp.ReleaseFunc) { +func (c StreamTest) Push(ctx context.Context, params func(StreamTest_push_Params) error) error { s := capnp.Send{ Method: capnp.Method{ InterfaceID: 0xbb3ca85b01eea465, @@ -327,8 +336,13 @@ func (c StreamTest) Push(ctx context.Context, params func(StreamTest_push_Params s.ArgsSize = capnp.ObjectSize{DataSize: 0, PointerCount: 1} s.PlaceArgs = func(s capnp.Struct) error { return params(StreamTest_push_Params(s)) } } - ans, release := capnp.Client(c).SendCall(ctx, s) - return stream.StreamResult_Future{Future: ans.Future()}, release + + return capnp.Client(c).SendStreamCall(ctx, s) + +} + +func (c StreamTest) WaitStreaming() error { + return capnp.Client(c).WaitStreaming() } // String returns a string that identifies this capability for debugging @@ -396,7 +410,9 @@ func (c StreamTest) SetFlowLimiter(lim fc.FlowLimiter) { // for this client. func (c StreamTest) GetFlowLimiter() fc.FlowLimiter { return capnp.Client(c).GetFlowLimiter() -} // A StreamTest_Server is a StreamTest with a local implementation. +} + +// A StreamTest_Server is a StreamTest with a local implementation. type StreamTest_Server interface { Push(context.Context, StreamTest_push) error } @@ -544,6 +560,7 @@ type CapArgsTest capnp.Client const CapArgsTest_TypeID = 0xb86bce7f916a10cc func (c CapArgsTest) Call(ctx context.Context, params func(CapArgsTest_call_Params) error) (CapArgsTest_call_Results_Future, capnp.ReleaseFunc) { + s := capnp.Send{ Method: capnp.Method{ InterfaceID: 0xb86bce7f916a10cc, @@ -556,10 +573,14 @@ func (c CapArgsTest) Call(ctx context.Context, params func(CapArgsTest_call_Para s.ArgsSize = capnp.ObjectSize{DataSize: 0, PointerCount: 1} s.PlaceArgs = func(s capnp.Struct) error { return params(CapArgsTest_call_Params(s)) } } + ans, release := capnp.Client(c).SendCall(ctx, s) return CapArgsTest_call_Results_Future{Future: ans.Future()}, release + } + func (c CapArgsTest) Self(ctx context.Context, params func(CapArgsTest_self_Params) error) (CapArgsTest_self_Results_Future, capnp.ReleaseFunc) { + s := capnp.Send{ Method: capnp.Method{ InterfaceID: 0xb86bce7f916a10cc, @@ -572,8 +593,14 @@ func (c CapArgsTest) Self(ctx context.Context, params func(CapArgsTest_self_Para s.ArgsSize = capnp.ObjectSize{DataSize: 0, PointerCount: 0} s.PlaceArgs = func(s capnp.Struct) error { return params(CapArgsTest_self_Params(s)) } } + ans, release := capnp.Client(c).SendCall(ctx, s) return CapArgsTest_self_Results_Future{Future: ans.Future()}, release + +} + +func (c CapArgsTest) WaitStreaming() error { + return capnp.Client(c).WaitStreaming() } // String returns a string that identifies this capability for debugging @@ -641,7 +668,9 @@ func (c CapArgsTest) SetFlowLimiter(lim fc.FlowLimiter) { // for this client. func (c CapArgsTest) GetFlowLimiter() fc.FlowLimiter { return capnp.Client(c).GetFlowLimiter() -} // A CapArgsTest_Server is a CapArgsTest with a local implementation. +} + +// A CapArgsTest_Server is a CapArgsTest with a local implementation. type CapArgsTest_Server interface { Call(context.Context, CapArgsTest_call) error @@ -1043,6 +1072,7 @@ type PingPongProvider capnp.Client const PingPongProvider_TypeID = 0x95b6142577e93239 func (c PingPongProvider) PingPong(ctx context.Context, params func(PingPongProvider_pingPong_Params) error) (PingPongProvider_pingPong_Results_Future, capnp.ReleaseFunc) { + s := capnp.Send{ Method: capnp.Method{ InterfaceID: 0x95b6142577e93239, @@ -1055,8 +1085,14 @@ func (c PingPongProvider) PingPong(ctx context.Context, params func(PingPongProv s.ArgsSize = capnp.ObjectSize{DataSize: 0, PointerCount: 0} s.PlaceArgs = func(s capnp.Struct) error { return params(PingPongProvider_pingPong_Params(s)) } } + ans, release := capnp.Client(c).SendCall(ctx, s) return PingPongProvider_pingPong_Results_Future{Future: ans.Future()}, release + +} + +func (c PingPongProvider) WaitStreaming() error { + return capnp.Client(c).WaitStreaming() } // String returns a string that identifies this capability for debugging @@ -1124,7 +1160,9 @@ func (c PingPongProvider) SetFlowLimiter(lim fc.FlowLimiter) { // for this client. func (c PingPongProvider) GetFlowLimiter() fc.FlowLimiter { return capnp.Client(c).GetFlowLimiter() -} // A PingPongProvider_Server is a PingPongProvider with a local implementation. +} + +// A PingPongProvider_Server is a PingPongProvider with a local implementation. type PingPongProvider_Server interface { PingPong(context.Context, PingPongProvider_pingPong) error } diff --git a/rpc/streaming_test.go b/rpc/streaming_test.go new file mode 100644 index 00000000..a0d3b812 --- /dev/null +++ b/rpc/streaming_test.go @@ -0,0 +1,45 @@ +package rpc_test + +import ( + "context" + "fmt" + "testing" + + "capnproto.org/go/capnp/v3/rpc/internal/testcapnp" + "github.com/stretchr/testify/assert" +) + +// TestStreamingWaitOk verifies that if no errors occur in streaming calls, +// WaitStreaming retnurs nil. +func TestStreamingWaitOk(t *testing.T) { + ctx := context.Background() + client := testcapnp.StreamTest_ServerToClient(&maxPushStream{limit: 1}) + defer client.Release() + assert.NoError(t, client.Push(ctx, nil)) + assert.NoError(t, client.WaitStreaming()) +} + +// TestStreamingWaitErr verifies that if an error occurs in a streaming call, +// it shows up in a subsequent call to WaitStreaming(). +func TestStreamingWaitErr(t *testing.T) { + ctx := context.Background() + client := testcapnp.StreamTest_ServerToClient(&maxPushStream{limit: 0}) + defer client.Release() + assert.NoError(t, client.Push(ctx, nil)) + assert.NotNil(t, client.WaitStreaming()) +} + +// A maxPushStream is an implementation of StreamTest that +// starts returning errors after a specified number of calls. +type maxPushStream struct { + count int // How many calls have we seen? + limit int // How many calls are permitted? +} + +func (m *maxPushStream) Push(context.Context, testcapnp.StreamTest_push) error { + m.count++ + if m.count > m.limit { + return fmt.Errorf("Exceeded limit of %v calls", m.limit) + } + return nil +} diff --git a/std/capnp/persistent/persistent.capnp.go b/std/capnp/persistent/persistent.capnp.go index 4ad24ed4..9a7ab855 100644 --- a/std/capnp/persistent/persistent.capnp.go +++ b/std/capnp/persistent/persistent.capnp.go @@ -20,6 +20,7 @@ type Persistent capnp.Client const Persistent_TypeID = 0xc8cb212fcd9f5691 func (c Persistent) Save(ctx context.Context, params func(Persistent_SaveParams) error) (Persistent_SaveResults_Future, capnp.ReleaseFunc) { + s := capnp.Send{ Method: capnp.Method{ InterfaceID: 0xc8cb212fcd9f5691, @@ -32,8 +33,14 @@ func (c Persistent) Save(ctx context.Context, params func(Persistent_SaveParams) s.ArgsSize = capnp.ObjectSize{DataSize: 0, PointerCount: 1} s.PlaceArgs = func(s capnp.Struct) error { return params(Persistent_SaveParams(s)) } } + ans, release := capnp.Client(c).SendCall(ctx, s) return Persistent_SaveResults_Future{Future: ans.Future()}, release + +} + +func (c Persistent) WaitStreaming() error { + return capnp.Client(c).WaitStreaming() } // String returns a string that identifies this capability for debugging @@ -101,7 +108,9 @@ func (c Persistent) SetFlowLimiter(lim fc.FlowLimiter) { // for this client. func (c Persistent) GetFlowLimiter() fc.FlowLimiter { return capnp.Client(c).GetFlowLimiter() -} // A Persistent_Server is a Persistent with a local implementation. +} + +// A Persistent_Server is a Persistent with a local implementation. type Persistent_Server interface { Save(context.Context, Persistent_save) error }