Skip to content

Commit

Permalink
Merge pull request #453 from zenhack/go-util
Browse files Browse the repository at this point in the history
Use go-util module, with some trivial substitutions
  • Loading branch information
lthibault authored Feb 18, 2023
2 parents f72f371 + 9c9a722 commit 8a72aaf
Show file tree
Hide file tree
Showing 5 changed files with 43 additions and 59 deletions.
19 changes: 7 additions & 12 deletions flowcontrol/internal/test-tool/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"capnproto.org/go/capnp/v3/flowcontrol/tracing"
"capnproto.org/go/capnp/v3/internal/syncutil"
"capnproto.org/go/capnp/v3/rpc"
"zenhack.net/go/util"
)

var (
Expand Down Expand Up @@ -52,7 +53,7 @@ func main() {

func doClient(ctx context.Context) {
netConn, err := net.Dial("tcp", *addr)
chkfatal(err)
util.Chkfatal(err)
rpcConn := rpc.NewConn(rpc.NewStreamTransport(netConn), nil)
defer rpcConn.Close()
w := Writer(rpcConn.Bootstrap(ctx))
Expand Down Expand Up @@ -99,7 +100,7 @@ func doClient(ctx context.Context) {
wg := &sync.WaitGroup{}
for sent < *totaldata && ctx.Err() == nil {
fut, rel := w.Write(ctx, func(p Writer_write_Params) error {
chkfatal(p.SetData(make([]byte, *packetsize)))
util.Chkfatal(p.SetData(make([]byte, *packetsize)))
sz, _ := p.Message().TotalSize()
sent += int(sz)
return nil
Expand All @@ -110,7 +111,7 @@ func doClient(ctx context.Context) {
wg.Wait()

endTime := time.Now()
chkfatal(ctx.Err())
util.Chkfatal(ctx.Err())

duration := endTime.Sub(startTime)
bandwidth := float64(sent) / (float64(duration) / float64(time.Second))
Expand All @@ -129,7 +130,7 @@ func doClient(ctx context.Context) {

enc := json.NewEncoder(os.Stdout)
enc.SetIndent("", " ")
chkfatal(enc.Encode(report))
util.Chkfatal(enc.Encode(report))
}

func waitAsync(wg *sync.WaitGroup, fut Writer_write_Results_Future, rel capnp.ReleaseFunc) {
Expand All @@ -138,13 +139,13 @@ func waitAsync(wg *sync.WaitGroup, fut Writer_write_Results_Future, rel capnp.Re
defer wg.Done()
defer rel()
_, err := fut.Struct()
chkfatal(err)
util.Chkfatal(err)
}()
}

func doServer() {
l, err := net.Listen("tcp", *addr)
chkfatal(err)
util.Chkfatal(err)
for {
netConn, err := l.Accept()
if err != nil {
Expand All @@ -160,12 +161,6 @@ func doServer() {
}
}

func chkfatal(err error) {
if err != nil {
panic(err)
}
}

type writerImpl struct {
}

Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ require (
github.com/stretchr/testify v1.7.0
github.com/tinylib/msgp v1.1.5
golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9
zenhack.net/go/util v0.0.0-20230218002511-744d2d6d1739
)

require (
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -38,3 +38,5 @@ gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c h1:dUUwHk2QECo/6vqA44rthZ8ie2QXMNeKRTHCNY2nXvo=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
zenhack.net/go/util v0.0.0-20230218002511-744d2d6d1739 h1:/QnbZBURrZUFvnxB4wDyRrPsWzh2KWbJ6AjUjohCHJ8=
zenhack.net/go/util v0.0.0-20230218002511-744d2d6d1739/go.mod h1:0lafdGg7tDb7RcXASgmJmRbLFLkAxu328+KGIs7icDE=
67 changes: 31 additions & 36 deletions rpc/receiveranswer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"capnproto.org/go/capnp/v3"
"capnproto.org/go/capnp/v3/rpc/internal/testcapnp"
"capnproto.org/go/capnp/v3/server"
"zenhack.net/go/util"
)

type capArgsTest struct {
Expand All @@ -30,7 +31,7 @@ func (me *capArgsTest) Self(ctx context.Context, p testcapnp.CapArgsTest_self) e
func (me *capArgsTest) Call(ctx context.Context, p testcapnp.CapArgsTest_call) error {
defer close(me.Errs)
client := p.Args().Cap()
chkfatal(client.Resolve(ctx))
util.Chkfatal(client.Resolve(ctx))
brand, ok := server.IsServer(client.State().Brand)
if !ok {
err := fmt.Errorf("server.IsServer returned !ok")
Expand All @@ -46,12 +47,6 @@ func (me *capArgsTest) Call(ctx context.Context, p testcapnp.CapArgsTest_call) e
return nil
}

func chkfatal(err error) {
if err != nil {
panic(err)
}
}

func TestBootstrapReceiverAnswerRpc(t *testing.T) {
t.Parallel()

Expand Down Expand Up @@ -84,7 +79,7 @@ func TestBootstrapReceiverAnswerRpc(t *testing.T) {
c.Release()

_, err := res.Struct()
chkfatal(err)
util.Chkfatal(err)

for err := range errChan {
t.Errorf("Error: %v", err)
Expand Down Expand Up @@ -127,9 +122,9 @@ func TestCallReceiverAnswerRpc(t *testing.T) {
defer rel()

_, err := selfRes.Struct()
chkfatal(err)
util.Chkfatal(err)
_, err = callRes.Struct()
chkfatal(err)
util.Chkfatal(err)

for err = range errChan {
t.Errorf("Error: %v", err)
Expand All @@ -156,41 +151,41 @@ func TestBootstrapReceiverAnswer(t *testing.T) {
trans := NewStreamTransport(cClient)

outMsg, err := trans.NewMessage()
chkfatal(err)
util.Chkfatal(err)

bs, err := outMsg.Message.NewBootstrap()
chkfatal(err)
util.Chkfatal(err)
bs.SetQuestionId(0)
outMsg.Send()
outMsg.Release()

outMsg, err = trans.NewMessage()
chkfatal(err)
util.Chkfatal(err)

// bootstrap.call(cap = bootstrap)
call, err := outMsg.Message.NewCall()
chkfatal(err)
util.Chkfatal(err)
call.SetQuestionId(1)
tgt, err := call.NewTarget()
chkfatal(err)
util.Chkfatal(err)
pa, err := tgt.NewPromisedAnswer()
chkfatal(err)
util.Chkfatal(err)
pa.SetQuestionId(0)
// Can leave off transform, since the root of the response is the
// bootstrap capability.
call.SetInterfaceId(testcapnp.CapArgsTest_TypeID)
call.SetMethodId(0)
params, err := call.NewParams()
chkfatal(err)
util.Chkfatal(err)
capTable, err := params.NewCapTable(1)
chkfatal(err)
util.Chkfatal(err)
capDesc := capTable.At(0)
ra, err := capDesc.NewReceiverAnswer()
chkfatal(err)
util.Chkfatal(err)
ra.SetQuestionId(0)
seg := params.Segment()
argStruct, err := capnp.NewStruct(seg, capnp.ObjectSize{PointerCount: 1})
chkfatal(err)
util.Chkfatal(err)
argStruct.SetPtr(0, capnp.NewInterface(seg, 0).ToPtr())
params.SetContent(argStruct.ToPtr())
outMsg.Send()
Expand Down Expand Up @@ -221,63 +216,63 @@ func TestCallReceiverAnswer(t *testing.T) {
trans := NewStreamTransport(cClient)

outMsg, err := trans.NewMessage()
chkfatal(err)
util.Chkfatal(err)

bs, err := outMsg.Message.NewBootstrap()
chkfatal(err)
util.Chkfatal(err)
bs.SetQuestionId(0)
outMsg.Send()
outMsg.Release()

outMsg, err = trans.NewMessage()
chkfatal(err)
util.Chkfatal(err)

// qid1 = bootstrap.self()
call, err := outMsg.Message.NewCall()
chkfatal(err)
util.Chkfatal(err)
call.SetQuestionId(1)
tgt, err := call.NewTarget()
chkfatal(err)
util.Chkfatal(err)
pa, err := tgt.NewPromisedAnswer()
chkfatal(err)
util.Chkfatal(err)
pa.SetQuestionId(0)
call.SetInterfaceId(testcapnp.CapArgsTest_TypeID)
call.SetMethodId(1)
outMsg.Send()
outMsg.Release()

outMsg, err = trans.NewMessage()
chkfatal(err)
util.Chkfatal(err)

// qid1.self.call(cap = qid1.self)
call, err = outMsg.Message.NewCall()
chkfatal(err)
util.Chkfatal(err)
call.SetQuestionId(2)
tgt, err = call.NewTarget()
chkfatal(err)
util.Chkfatal(err)
pa, err = tgt.NewPromisedAnswer()
chkfatal(err)
util.Chkfatal(err)
pa.SetQuestionId(1)
transform, err := pa.NewTransform(1)
chkfatal(err)
util.Chkfatal(err)
transform.At(0).SetGetPointerField(0)
call.SetInterfaceId(testcapnp.CapArgsTest_TypeID)
call.SetMethodId(0)
params, err := call.NewParams()
chkfatal(err)
util.Chkfatal(err)
capTable, err := params.NewCapTable(1)
chkfatal(err)
util.Chkfatal(err)
capDesc := capTable.At(0)
ra, err := capDesc.NewReceiverAnswer()
chkfatal(err)
util.Chkfatal(err)
transform.At(0).SetGetPointerField(0)
ra.SetQuestionId(1)
transform, err = ra.NewTransform(1)
chkfatal(err)
util.Chkfatal(err)
transform.At(0).SetGetPointerField(0)
seg := params.Segment()
argStruct, err := capnp.NewStruct(seg, capnp.ObjectSize{PointerCount: 1})
chkfatal(err)
util.Chkfatal(err)
argStruct.SetPtr(0, capnp.NewInterface(seg, 0).ToPtr())
params.SetContent(argStruct.ToPtr())
outMsg.Send()
Expand Down
13 changes: 2 additions & 11 deletions rpc/rpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"time"

"golang.org/x/sync/errgroup"
"zenhack.net/go/util"

"capnproto.org/go/capnp/v3"
"capnproto.org/go/capnp/v3/exc"
Expand Down Expand Up @@ -725,16 +726,6 @@ func (c *Conn) handleBootstrap(ctx context.Context, id answerID) error {
return err
}

func idempotent(f func()) func() {
called := false
return func() {
if !called {
called = true
f()
}
}
}

func (c *Conn) handleCall(ctx context.Context, call rpccp.Call, releaseCall capnp.ReleaseFunc) error {
rl := &releaseList{}
defer rl.Release()
Expand Down Expand Up @@ -826,7 +817,7 @@ func (c *Conn) handleCall(ctx context.Context, call rpccp.Call, releaseCall capn
recv := capnp.Recv{
Args: p.args,
Method: p.method,
ReleaseArgs: idempotent(releaseCall),
ReleaseArgs: util.Idempotent(releaseCall),
Returner: ans,
}

Expand Down

0 comments on commit 8a72aaf

Please sign in to comment.