Skip to content

Commit

Permalink
Merge pull request capnproto#450 from zenhack/streaming-api
Browse files Browse the repository at this point in the history
Streaming support helpers.
  • Loading branch information
zenhack committed Feb 14, 2023
2 parents f07bb8d + d2e85b5 commit f72f371
Show file tree
Hide file tree
Showing 10 changed files with 326 additions and 36 deletions.
58 changes: 58 additions & 0 deletions capability.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"strconv"
"sync"

"capnproto.org/go/capnp/v3/exc"
"capnproto.org/go/capnp/v3/exp/bufferpool"
"capnproto.org/go/capnp/v3/flowcontrol"
"capnproto.org/go/capnp/v3/internal/str"
Expand Down Expand Up @@ -126,6 +127,11 @@ type client struct {
limiter flowcontrol.FlowLimiter
h *clientHook // nil if resolved to nil or released
released bool

stream struct {
err error // Last error from streaming calls.
wg sync.WaitGroup // Outstanding calls.
}
}

// clientHook is a reference-counted wrapper for a ClientHook.
Expand Down Expand Up @@ -321,6 +327,15 @@ func (c Client) SendCall(ctx context.Context, s Send) (*Answer, ReleaseFunc) {
return ErrorAnswer(s.Method, errors.New("call on null client")), func() {}
}

var err error
syncutil.With(&c.mu, func() {
err = c.stream.err
})

if err != nil {
return ErrorAnswer(s.Method, exc.WrapError("stream error", err)), func() {}
}

limiter := c.GetFlowLimiter()

// We need to call PlaceArgs before we will know the size of message for
Expand Down Expand Up @@ -381,6 +396,49 @@ func (c Client) SendCall(ctx context.Context, s Send) (*Answer, ReleaseFunc) {
return ans, rel
}

// SendStreamCall is like SendCall except that:
//
// 1. It does not return an answer for the eventual result.
// 2. If the call returns an error, all future calls on this
// client will return the same error (without starting
// the method or calling PlaceArgs).
func (c Client) SendStreamCall(ctx context.Context, s Send) error {
var streamError error
syncutil.With(&c.mu, func() {
streamError = c.stream.err
if streamError == nil {
c.stream.wg.Add(1)
}
})
if streamError != nil {
return streamError
}
ans, release := c.SendCall(ctx, s)
go func() {
defer c.stream.wg.Done()
_, err := ans.Future().Ptr()
release()
if err != nil {
syncutil.With(&c.mu, func() {
c.stream.err = err
})
}
}()
return nil
}

// WaitStreaming waits for all outstanding streaming calls (i.e. calls
// started with SendStreamCall) to complete, and then returns an error
// if any streaming call has failed.
func (c Client) WaitStreaming() error {
c.stream.wg.Wait()
var ret error
syncutil.With(&c.mu, func() {
ret = c.stream.err
})
return ret
}

// RecvCall starts executing a method with the referenced arguments
// and returns an answer that will hold the result. The hook will call
// a.Release when it no longer needs to reference the parameters. The
Expand Down
5 changes: 5 additions & 0 deletions capnpc-go/nodes.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (

"capnproto.org/go/capnp/v3"
"capnproto.org/go/capnp/v3/internal/schema"
"capnproto.org/go/capnp/v3/std/capnp/stream"
)

type node struct {
Expand Down Expand Up @@ -100,6 +101,10 @@ type interfaceMethod struct {
Results *node
}

func (m interfaceMethod) IsStreaming() bool {
return m.Results.Id() == stream.StreamResult_TypeID
}

func methodSet(methods []interfaceMethod, n *node, nodes nodeMap) ([]interfaceMethod, error) {
ms, _ := n.Interface().Methods()
for i := 0; i < ms.Len(); i++ {
Expand Down
17 changes: 15 additions & 2 deletions capnpc-go/templates/interfaceClient
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,11 @@ type {{.Node.Name}} capnp.Client
{{ template "_typeid" .Node }}

{{range .Methods -}}
func (c {{$.Node.Name}}) {{.Name|title}}(ctx {{$.G.Imports.Context}}.Context, params func({{$.G.RemoteNodeName .Params $.Node}}) error) ({{$.G.RemoteNodeName .Results $.Node}}_Future, capnp.ReleaseFunc) {

func (c {{$.Node.Name}}) {{.Name|title}}(ctx {{$.G.Imports.Context}}.Context, params func({{$.G.RemoteNodeName .Params $.Node}}) error)
{{- if .IsStreaming }} error {
{{- else }} ({{$.G.RemoteNodeName .Results $.Node}}_Future, capnp.ReleaseFunc) {
{{ end }}
s := capnp.Send{
Method: capnp.Method{
{{template "_interfaceMethod" .}}
Expand All @@ -16,11 +20,20 @@ func (c {{$.Node.Name}}) {{.Name|title}}(ctx {{$.G.Imports.Context}}.Context, pa
s.ArgsSize = {{$.G.ObjectSize .Params}}
s.PlaceArgs = func(s capnp.Struct) error { return params({{$.G.RemoteNodeName .Params $.Node}}(s)) }
}
{{ if .IsStreaming }}
return capnp.Client(c).SendStreamCall(ctx, s)
{{ else }}
ans, release := capnp.Client(c).SendCall(ctx, s)
return {{$.G.RemoteNodeName .Results $.Node}}_Future{Future: ans.Future()}, release
{{ end }}
}

{{end}}

func (c {{$.Node.Name}}) WaitStreaming() error {
return capnp.Client(c).WaitStreaming()
}

// String returns a string that identifies this capability for debugging
// purposes. Its format should not be depended on: in particular, it
// should not be used to compare clients. Use IsSame to compare clients
Expand Down Expand Up @@ -86,4 +99,4 @@ func (c {{$.Node.Name}}) SetFlowLimiter(lim {{.G.Imports.FlowControl}}.FlowLimit
// for this client.
func (c {{$.Node.Name}}) GetFlowLimiter() {{.G.Imports.FlowControl}}.FlowLimiter {
return capnp.Client(c).GetFlowLimiter()
}
}
92 changes: 92 additions & 0 deletions docs/Remote-Procedure-Calls-using-Interfaces.md
Original file line number Diff line number Diff line change
Expand Up @@ -238,6 +238,98 @@ It isn't until the `Struct()` method is called on a method that the `client` fun

A few additional words on the Future type are in order. If your RPC method returns another interface type, you can use the Future to immediately make calls against that as-of-yet-unreturned interface. This relies on a feature of the Cap'n Proto RPC protocol called [promise pipelining][pipelining], the advantage of which is that Cap'n Proto can often optimize away the additional network round-trips when such method calls are chained. This is one of Cap'n Proto's key advantages, which we will use heavily in the next chapter.

## Streaming and Backpressure

Cap'n Proto supports streaming workflows. Unlike other RPC protocols
such as grpc, this can be done without any dedicated "streaming"
construct. Instead, you can define an interface such as:

```capnp
interface ByteStream {
write @0 (data :Data);
done @1 ();
}
```

The above is roughly analogous to the `io.WriteCloser` interface. If you
have a `ByteStream` interface, you can write your data into it in
chunks, and then call done() to signal that all data has been
written.

There are however two wrinkles.

The first is flow control. If you naively call write() in a loop, Cap'n
Proto will not by default provide any backpressure, resulting in excess
memory usage and high latency. But waiting for each call in turn results
in low throughput. To solve this, you need to attach a flow limiter,
from the `flowcontrol` package. You can do this with `SetFlowLimiter` on
any capability:

```go
import "capnproto.org/go/capnp/v3/flowcontrol"

// ...

// Limits in-flight data to 2^16 bytes = 64KiB:
client.SetFlowLimiter(flowcontrol.NewFixedLimiter(1 << 16))
```

If too much data is already in-flight, This will cause future rpc calls
to block until some existing ones have returned.

The second wrinkle is dealing with return values: even though
`ByteStream.write()` has no return value, each call will return a future
and ReleaseFunc which must be waited on at some point. You could
accumulate these in a slice and wait on each of them at the end, but
there is a better option: the schema language supports a special
`stream` return type:

```capnp
interface ByteStream {
write @0 (data :Data) -> stream;
done @1 ();
}
```

If the return type of a method is `stream`, instead of returning
a future and `ReleaseFunc`, the generated method will just return an
error:

```diff
- func (c ByteStream) Write(ctx context.Context, params func(ByteStream_write_Params) error) (stream.ByteStream_write_Results_Future, capnp.ReleaseFunc) {
+ func (c ByteStream) Write(ctx context.Context, params func(ByteStream_write_Params) error) error {
```

The implementation will take care of waiting on the Future. If the
error is non-nil, it means some prior streaming call failed; this
can be useful for short-circuiting long streaming workflows.

Additionally, each client type has a `WaitStreaming` method, which should
be called at the end of a streaming workflow. A full example might look
like:

```go
for chunk := range chunks {
err := client.Write(ctx, func(p ByteStream_write_Params) error {
return p.SetData(chunk)
})
if err != nil {
return err
}
}

future, release := client.Done(ctx, nil)
defer release()
_, err := future.Struct()
if err != nil {
return err
}

if err := client.WaitStreaming(); err != nil {
return err
}
```

[rpc]: https://capnproto.org/rpc.html
[pipelining]: https://capnproto.org/news/2013-12-13-promise-pipelining-capnproto-vs-ice.html

Expand Down
23 changes: 16 additions & 7 deletions flowcontrol/internal/test-tool/writer.capnp.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit f72f371

Please sign in to comment.