Skip to content

Commit

Permalink
Add lock to writer sink to ensure thread safety (#110)
Browse files Browse the repository at this point in the history
The included test will show data races in the race detector if the
safety goes away.
  • Loading branch information
jefferai authored Feb 16, 2024
1 parent efdb51e commit 7a72cf9
Show file tree
Hide file tree
Showing 2 changed files with 43 additions and 0 deletions.
5 changes: 5 additions & 0 deletions sinks/writer/writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"context"
"errors"
"io"
"sync"

"github.com/hashicorp/eventlogger"
)
Expand All @@ -16,6 +17,8 @@ import (
// string. Sink allows you to define sinks for any io.Writer which
// includes os.Stdout and os.Stderr
type Sink struct {
l sync.RWMutex

// Format specifies the format the []byte representation is formatted in
// Defaults to JSONFormat
Format string
Expand Down Expand Up @@ -51,6 +54,8 @@ func (fs *Sink) Process(ctx context.Context, e *eventlogger.Event) (*eventlogger
}
reader := bytes.NewReader(val)

fs.l.Lock()
defer fs.l.Unlock()
if _, err := reader.WriteTo(fs.Writer); err != nil {
return nil, err
}
Expand Down
38 changes: 38 additions & 0 deletions sinks/writer/writer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,11 @@ import (
"context"
"io"
"io/ioutil"
"math/rand"
"os"
"sync"
"testing"
"time"

"github.com/hashicorp/eventlogger"
)
Expand Down Expand Up @@ -91,3 +94,38 @@ func TestWriterSink_Process(t *testing.T) {
}
})
}

// This test is a canary for the race detector
func TestWriterSink_Process_Concurrent(t *testing.T) {
ctx := context.Background()

event := &eventlogger.Event{
Formatted: map[string][]byte{eventlogger.JSONFormat: []byte("first\n")},
Payload: "First entry",
}

writer := new(bytes.Buffer)

s := Sink{
Writer: writer,
}

wg := new(sync.WaitGroup)
wg.Add(2)
go func() {
defer wg.Done()
for i := 0; i < 100; i++ {
_, _ = s.Process(ctx, event)
time.Sleep(time.Duration(rand.Intn(100)) * time.Millisecond)
}
}()
go func() {
defer wg.Done()
for i := 0; i < 100; i++ {
_, _ = s.Process(ctx, event)
time.Sleep(time.Duration(rand.Intn(100)) * time.Millisecond)
}
}()

wg.Wait()
}

0 comments on commit 7a72cf9

Please sign in to comment.