Skip to content

Commit

Permalink
Merge pull request #580 from matheusd/fix-reuse-test
Browse files Browse the repository at this point in the history
message: Fix benchmark read reuse test
  • Loading branch information
lthibault authored Aug 6, 2024
2 parents fafeb47 + b0b04f3 commit 1d667cd
Show file tree
Hide file tree
Showing 4 changed files with 130 additions and 20 deletions.
116 changes: 100 additions & 16 deletions integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1725,63 +1725,147 @@ func unsafeBytesToString(b []byte) string {
return *(*string)(unsafe.Pointer(&b))
}

// BenchmarkMarshal benchmarks marshalling a message using the simplest API
// available to go-capnp users.
func BenchmarkMarshal(b *testing.B) {
r := rand.New(rand.NewSource(12345))
data := make([]*A, 1000)
for i := range data {
data[i] = generateA(r)
}
arena := make([]byte, 0, 512)
b.ReportAllocs()
b.ResetTimer()
for i := 0; i < b.N; i++ {
a := data[r.Intn(len(data))]
msg, seg, _ := capnp.NewMessage(capnp.SingleSegment(arena[:0]))
root, _ := air.NewRootBenchmarkA(seg)
msg, seg, err := capnp.NewMessage(capnp.SingleSegment(nil))
if err != nil {
b.Fatal(err)
}
root, err := air.NewRootBenchmarkA(seg)
if err != nil {
b.Fatal(err)
}
a.fill(root)
msg.Marshal()
_, err = msg.Marshal()
if err != nil {
b.Fatal(err)
}
}
}

// BenchmarkMarshal_ReuseMsg benchmarks marshalling while reusing the message
// object.
func BenchmarkMarshal_ReuseMsg(b *testing.B) {
r := rand.New(rand.NewSource(12345))
data := make([]*A, 1000)
for i := range data {
data[i] = generateA(r)
}
msg, _, err := capnp.NewMessage(capnp.SingleSegment(nil))
if err != nil {
b.Fatal(err)
}
b.ReportAllocs()
b.ResetTimer()
for i := 0; i < b.N; i++ {
a := data[r.Intn(len(data))]
seg, err := msg.Reset(msg.Arena)
if err != nil {
b.Fatal(err)
}
root, err := air.NewRootBenchmarkA(seg)
if err != nil {
b.Fatal(err)
}
a.fill(root)
_, err = msg.Marshal()
if err != nil {
b.Fatal(err)
}
}
}

// BenchmarkUnmarshal benchmarks unmarshalling using the simplest API possible
// available to go-capnp users.
func BenchmarkUnmarshal(b *testing.B) {
r := rand.New(rand.NewSource(12345))
data := make([][]byte, 1000)
type testCase struct {
a A
data []byte
}

data := make([]testCase, 1000)
for i := range data {
a := generateA(r)
msg, seg, _ := capnp.NewMessage(capnp.SingleSegment(nil))
root, _ := air.NewRootBenchmarkA(seg)
a.fill(root)
data[i], _ = msg.Marshal()
buf, err := msg.Marshal()
if err != nil {
b.Fatal(err)
}
data[i].data, data[i].a = buf, *a
}

b.ReportAllocs()
b.ResetTimer()
for i := 0; i < b.N; i++ {
msg, _ := capnp.Unmarshal(data[r.Intn(len(data))])
a, _ := air.ReadRootBenchmarkA(msg)
unmarshalA(a)
testIdx := r.Intn(len(data))
msg, err := capnp.Unmarshal(data[testIdx].data)
if err != nil {
b.Fatal(err)
}
a, err := air.ReadRootBenchmarkA(msg)
if err != nil {
b.Fatal(err)
}
gotA := unmarshalA(a)
if gotA != data[testIdx].a {
b.Fatal("unexpected unmarshalled data")
}
}
}

// BenchmarkUnmarshal_Reuse benchmarks unmarshalling by reusing the msg object
// and directly using the test data buffer as an arena.
//
// NOTE: this bypasses framing done by capnp marshalling, thus expects the
// caller to frame the message appropriately.
func BenchmarkUnmarshal_Reuse(b *testing.B) {
type testCase struct {
a A
data []byte
}
r := rand.New(rand.NewSource(12345))
data := make([][]byte, 1000)
data := make([]testCase, 1000)
for i := range data {
a := generateA(r)
msg, seg, _ := capnp.NewMessage(capnp.SingleSegment(nil))
root, _ := air.NewRootBenchmarkA(seg)
a.fill(root)
data[i], _ = msg.Marshal()
buf, err := msg.Marshal()
if err != nil {
b.Fatal(err)
}
data[i].data, data[i].a = buf, *a
}
msg := new(capnp.Message)
ta := new(testArena)
arena := capnp.Arena(ta)
b.ReportAllocs()
b.ResetTimer()
for i := 0; i < b.N; i++ {
*ta = testArena(data[r.Intn(len(data))][8:])
msg.Reset(arena)
a, _ := air.ReadRootBenchmarkA(msg)
unmarshalA(a)
testIdx := r.Intn(len(data))
*ta = testArena(data[testIdx].data[8:]) // [8:] to skip header
msg.Release()
msg.Arena = ta
a, err := air.ReadRootBenchmarkA(msg)
if err != nil {
b.Fatal(err)
}
gotA := unmarshalA(a)
if gotA != data[testIdx].a {
b.Fatal("unexpected unmarshalled data")
}
}
}

Expand Down
18 changes: 14 additions & 4 deletions message.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,9 @@ const maxDepth = ^uint(0)
// A Message is a tree of Cap'n Proto objects, split into one or more
// segments of contiguous memory. The only required field is Arena.
// A Message is safe to read from multiple goroutines.
//
// A message must be set up with a fully valid Arena when reading or with
// a valid and empty arena by calling NewArena.
type Message struct {
// rlimit must be first so that it is 64-bit aligned.
// See sync/atomic docs.
Expand Down Expand Up @@ -60,6 +63,9 @@ type Message struct {

// NewMessage creates a message with a new root and returns the first
// segment. It is an error to call NewMessage on an arena with data in it.
//
// The new message is guaranteed to contain at least one segment and that
// segment is guaranteed to contain enough space for the root struct pointer.
func NewMessage(arena Arena) (*Message, *Segment, error) {
var msg Message
first, err := msg.Reset(arena)
Expand Down Expand Up @@ -93,10 +99,14 @@ func (m *Message) Release() {
m.Reset(nil)
}

// Reset the message to use a different arena, allowing it
// to be reused. This invalidates any existing pointers in
// the Message, releases all clients in the cap table, and
// releases the current Arena, so use with caution.
// Reset the message to use a different arena, allowing it to be reused. This
// invalidates any existing pointers in the Message, releases all clients in
// the cap table, and releases the current Arena, so use with caution.
//
// Reset fails if the new arena is not empty and is not able to allocate enough
// space for at least one segment and its root pointer. In other words, Reset
// with a non-nil arena must only be used for messages which will be modified,
// not read.
func (m *Message) Reset(arena Arena) (first *Segment, err error) {
m.capTable.Reset()
for k := range m.segs {
Expand Down
14 changes: 14 additions & 0 deletions message_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -661,3 +661,17 @@ func BenchmarkMessageGetFirstSegment(b *testing.B) {
}
}
}

// TestCannotResetArenaForRead demonstrates that Reset() cannot be used when
// intending to read data from an arena (i.e. cannot reuse a msg value by
// calling Reset with the intention to read data).
func TestCannotResetArenaForRead(t *testing.T) {
var msg Message
var arena Arena = SingleSegment(incrementingData(8))

_, err := msg.Reset(arena)
if err == nil {
t.Fatal("expected non nil error, got nil")
}
t.Logf("Got err: %v", err)
}
2 changes: 2 additions & 0 deletions segment.go
Original file line number Diff line number Diff line change
Expand Up @@ -395,6 +395,8 @@ func (s *Segment) writePtr(off address, src Ptr, forceCopy bool) error {
return nil
case hasCapacity(src.seg.data, wordSize):
// Enough room adjacent to src to write a far pointer landing pad.
// TODO: instead of alloc (which may choose another segment),
// enforce to _always_ use seg (because we know it has capacity).
_, padAddr, _ := alloc(src.seg, wordSize)
src.seg.writeRawPointer(padAddr, srcRaw.withOffset(nearPointerOffset(padAddr, srcAddr)))
s.writeRawPointer(off, rawFarPointer(src.seg.id, padAddr))
Expand Down

0 comments on commit 1d667cd

Please sign in to comment.