Skip to content

Commit

Permalink
[chore][pkg/stanza] Move more tests into reader package (open-telemet…
Browse files Browse the repository at this point in the history
…ry#29666)

This also includes some related refactoring of test code:
- Moved `tokenWithLength` utility function to `filetest`, since it's not
used by both `fileconsumer` and `reader` packages.
- Refactored `testFactory` to start with a default config and then apply
options as necessary per test. I expect eventually the _actual_
`reader.Factory` will be cleaned up to support a similar pattern, but
I'm trying to keep changes to test code until we have better coverage in
the `reader` package.
  • Loading branch information
djaglowski committed Dec 5, 2023
1 parent d6dda62 commit 5d413ae
Show file tree
Hide file tree
Showing 8 changed files with 282 additions and 217 deletions.
2 changes: 1 addition & 1 deletion pkg/stanza/fileconsumer/benchmark_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ type benchFile struct {
}

func simpleTextFile(b *testing.B, file *os.File) *benchFile {
line := string(tokenWithLength(49)) + "\n"
line := string(filetest.TokenWithLength(49)) + "\n"
return &benchFile{
File: file,
log: func(_ int) {
Expand Down
189 changes: 11 additions & 178 deletions pkg/stanza/fileconsumer/file_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -733,7 +733,7 @@ func TestDecodeBufferIsResized(t *testing.T) {
}()

temp := filetest.OpenTemp(t, tempDir)
expected := tokenWithLength(1<<12 + 1)
expected := filetest.TokenWithLength(1<<12 + 1)
filetest.WriteString(t, temp, string(expected)+"\n")

sink.ExpectToken(t, expected)
Expand Down Expand Up @@ -938,10 +938,10 @@ func TestRestartOffsets(t *testing.T) {

logFile := filetest.OpenTemp(t, tempDir)

before1stRun := tokenWithLength(tc.lineLength)
during1stRun := tokenWithLength(tc.lineLength)
duringRestart := tokenWithLength(tc.lineLength)
during2ndRun := tokenWithLength(tc.lineLength)
before1stRun := filetest.TokenWithLength(tc.lineLength)
during1stRun := filetest.TokenWithLength(tc.lineLength)
duringRestart := filetest.TokenWithLength(tc.lineLength)
during2ndRun := filetest.TokenWithLength(tc.lineLength)

operatorOne, sink1 := testManager(t, cfg)
filetest.WriteString(t, logFile, string(before1stRun)+"\n")
Expand Down Expand Up @@ -1028,7 +1028,7 @@ func TestFileBatching(t *testing.T) {
expectedTokens := make([][]byte, 0, files*linesPerFile)
for i, temp := range temps {
for j := 0; j < linesPerFile; j++ {
message := fmt.Sprintf("%s %d %d", tokenWithLength(100), i, j)
message := fmt.Sprintf("%s %d %d", filetest.TokenWithLength(100), i, j)
_, err := temp.WriteString(message + "\n")
require.NoError(t, err)
expectedTokens = append(expectedTokens, []byte(message))
Expand All @@ -1045,7 +1045,7 @@ func TestFileBatching(t *testing.T) {
expectedTokens = make([][]byte, 0, files*linesPerFile)
for i, temp := range temps {
for j := 0; j < linesPerFile; j++ {
message := fmt.Sprintf("%s %d %d", tokenWithLength(20), i, j)
message := fmt.Sprintf("%s %d %d", filetest.TokenWithLength(20), i, j)
_, err := temp.WriteString(message + "\n")
require.NoError(t, err)
expectedTokens = append(expectedTokens, []byte(message))
Expand Down Expand Up @@ -1110,173 +1110,6 @@ func TestFileBatchingRespectsStartAtEnd(t *testing.T) {
sink.ExpectTokens(t, expectedTokens...)
}

func TestFileReader_FingerprintUpdated(t *testing.T) {
t.Parallel()

tempDir := t.TempDir()
cfg := NewConfig().includeDir(tempDir)
cfg.StartAt = "beginning"
operator, sink := testManager(t, cfg)

temp := filetest.OpenTemp(t, tempDir)
tempCopy := filetest.OpenFile(t, temp.Name())
fp, err := operator.readerFactory.NewFingerprint(temp)
require.NoError(t, err)

reader, err := operator.readerFactory.NewReader(tempCopy, fp)
require.NoError(t, err)
defer reader.Close()

filetest.WriteString(t, temp, "testlog1\n")
reader.ReadToEnd(context.Background())
sink.ExpectToken(t, []byte("testlog1"))
require.Equal(t, []byte("testlog1\n"), reader.Fingerprint.FirstBytes)
}

// Test that a fingerprint:
// - Starts empty
// - Updates as a file is read
// - Stops updating when the max fingerprint size is reached
// - Stops exactly at max fingerprint size, regardless of content
func TestFingerprintGrowsAndStops(t *testing.T) {
t.Parallel()

// Use a number with many factors.
// Sometimes fingerprint length will align with
// the end of a line, sometimes not. Test both.
maxFP := 360

// Use prime numbers to ensure variation in
// whether or not they are factors of maxFP
lineLens := []int{3, 5, 7, 11, 13, 17, 19, 23, 27}

for _, lineLen := range lineLens {
t.Run(fmt.Sprintf("%d", lineLen), func(t *testing.T) {
t.Parallel()

tempDir := t.TempDir()
cfg := NewConfig().includeDir(tempDir)
cfg.StartAt = "beginning"
cfg.FingerprintSize = helper.ByteSize(maxFP)
operator, _ := testManager(t, cfg)

temp := filetest.OpenTemp(t, tempDir)
tempCopy := filetest.OpenFile(t, temp.Name())
fp, err := operator.readerFactory.NewFingerprint(temp)
require.NoError(t, err)
require.Equal(t, []byte(""), fp.FirstBytes)

reader, err := operator.readerFactory.NewReader(tempCopy, fp)
require.NoError(t, err)
defer reader.Close()

// keep track of what has been written to the file
var fileContent []byte

// keep track of expected fingerprint size
expectedFP := 0

// Write lines until file is much larger than the length of the fingerprint
for len(fileContent) < 2*maxFP {
expectedFP += lineLen
if expectedFP > maxFP {
expectedFP = maxFP
}

line := string(tokenWithLength(lineLen-1)) + "\n"
fileContent = append(fileContent, []byte(line)...)

filetest.WriteString(t, temp, line)
reader.ReadToEnd(context.Background())
require.Equal(t, fileContent[:expectedFP], reader.Fingerprint.FirstBytes)
}
})
}
}

// This is same test like TestFingerprintGrowsAndStops, but with additional check for fingerprint size check
// Test that a fingerprint:
// - Starts empty
// - Updates as a file is read
// - Stops updating when the max fingerprint size is reached
// - Stops exactly at max fingerprint size, regardless of content
// - Do not change size after fingerprint configuration change
func TestFingerprintChangeSize(t *testing.T) {
t.Parallel()

// Use a number with many factors.
// Sometimes fingerprint length will align with
// the end of a line, sometimes not. Test both.
maxFP := 360

// Use prime numbers to ensure variation in
// whether or not they are factors of maxFP
lineLens := []int{3, 5, 7, 11, 13, 17, 19, 23, 27}

for _, lineLen := range lineLens {
t.Run(fmt.Sprintf("%d", lineLen), func(t *testing.T) {
t.Parallel()

tempDir := t.TempDir()
cfg := NewConfig().includeDir(tempDir)
cfg.StartAt = "beginning"
cfg.FingerprintSize = helper.ByteSize(maxFP)
operator, _ := testManager(t, cfg)

temp := filetest.OpenTemp(t, tempDir)
tempCopy := filetest.OpenFile(t, temp.Name())
fp, err := operator.readerFactory.NewFingerprint(temp)
require.NoError(t, err)
require.Equal(t, []byte(""), fp.FirstBytes)

reader, err := operator.readerFactory.NewReader(tempCopy, fp)
require.NoError(t, err)
defer reader.Close()

// keep track of what has been written to the file
var fileContent []byte

// keep track of expected fingerprint size
expectedFP := 0

// Write lines until file is much larger than the length of the fingerprint
for len(fileContent) < 2*maxFP {
expectedFP += lineLen
if expectedFP > maxFP {
expectedFP = maxFP
}

line := string(tokenWithLength(lineLen-1)) + "\n"
fileContent = append(fileContent, []byte(line)...)

filetest.WriteString(t, temp, line)
reader.ReadToEnd(context.Background())
require.Equal(t, fileContent[:expectedFP], reader.Fingerprint.FirstBytes)
}

// Test fingerprint change
// Change fingerprint and try to read file again
// We do not expect fingerprint change
// We test both increasing and decreasing fingerprint size
reader.Config.FingerprintSize = maxFP * (lineLen / 3)
line := string(tokenWithLength(lineLen-1)) + "\n"
fileContent = append(fileContent, []byte(line)...)

filetest.WriteString(t, temp, line)
reader.ReadToEnd(context.Background())
require.Equal(t, fileContent[:expectedFP], reader.Fingerprint.FirstBytes)

reader.Config.FingerprintSize = maxFP / 2
line = string(tokenWithLength(lineLen-1)) + "\n"
fileContent = append(fileContent, []byte(line)...)

filetest.WriteString(t, temp, line)
reader.ReadToEnd(context.Background())
require.Equal(t, fileContent[:expectedFP], reader.Fingerprint.FirstBytes)
})
}
}

func TestEncodings(t *testing.T) {
t.Parallel()
cases := []struct {
Expand Down Expand Up @@ -1379,7 +1212,7 @@ func TestDeleteAfterRead(t *testing.T) {
// Write logs to each file
for i, temp := range temps {
for j := 0; j < linesPerFile; j++ {
line := tokenWithLength(100)
line := filetest.TokenWithLength(100)
message := fmt.Sprintf("%s %d %d", line, i, j)
_, err := temp.WriteString(message + "\n")
require.NoError(t, err)
Expand Down Expand Up @@ -1436,7 +1269,7 @@ func TestMaxBatching(t *testing.T) {
numExpectedTokens := expectedMaxFilesPerPoll * linesPerFile
for i, temp := range temps {
for j := 0; j < linesPerFile; j++ {
message := fmt.Sprintf("%s %d %d", tokenWithLength(100), i, j)
message := fmt.Sprintf("%s %d %d", filetest.TokenWithLength(100), i, j)
_, err := temp.WriteString(message + "\n")
require.NoError(t, err)
}
Expand All @@ -1451,7 +1284,7 @@ func TestMaxBatching(t *testing.T) {
// Write more logs to each file so we can validate that all files are still known
for i, temp := range temps {
for j := 0; j < linesPerFile; j++ {
message := fmt.Sprintf("%s %d %d", tokenWithLength(20), i, j)
message := fmt.Sprintf("%s %d %d", filetest.TokenWithLength(20), i, j)
_, err := temp.WriteString(message + "\n")
require.NoError(t, err)
}
Expand Down Expand Up @@ -1519,7 +1352,7 @@ func TestDeleteAfterRead_SkipPartials(t *testing.T) {

longFile := filetest.OpenTemp(t, tempDir)
for line := 0; line < longFileLines; line++ {
_, err := longFile.WriteString(string(tokenWithLength(100)) + "\n")
_, err := longFile.WriteString(string(filetest.TokenWithLength(100)) + "\n")
require.NoError(t, err)
}
require.NoError(t, longFile.Close())
Expand Down
10 changes: 10 additions & 0 deletions pkg/stanza/fileconsumer/internal/filetest/filetest.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
package filetest // import "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/internal/filetest"

import (
"math/rand"
"os"
"path/filepath"
"testing"
Expand Down Expand Up @@ -37,3 +38,12 @@ func WriteString(t testing.TB, file *os.File, s string) {
_, err := file.WriteString(s)
require.NoError(t, err)
}

func TokenWithLength(length int) []byte {
charset := "abcdefghijklmnopqrstuvwxyz"
b := make([]byte, length)
for i := range b {
b[i] = charset[rand.Intn(len(charset))]
}
return b
}
85 changes: 68 additions & 17 deletions pkg/stanza/fileconsumer/internal/reader/factory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,9 @@ import (
"time"

"github.com/stretchr/testify/require"
"golang.org/x/text/encoding"
"golang.org/x/text/encoding/unicode"

"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/decode"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/internal/emittest"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/internal/fingerprint"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/split"
Expand All @@ -18,32 +19,82 @@ import (
)

const (
defaultMaxLogSize = 1024 * 1024
defaultMaxConcurrentFiles = 1024
defaultEncoding = "utf-8"
defaultPollInterval = 200 * time.Millisecond
defaultFlushPeriod = 500 * time.Millisecond
defaultMaxLogSize = 1024 * 1024
defaultFlushPeriod = 500 * time.Millisecond
)

func testFactory(t *testing.T, sCfg split.Config, maxLogSize int, flushPeriod time.Duration) (*Factory, *emittest.Sink) {
enc, err := decode.LookupEncoding(defaultEncoding)
require.NoError(t, err)
func testFactory(t *testing.T, opts ...testFactoryOpt) (*Factory, *emittest.Sink) {
cfg := &testFactoryCfg{
fingerprintSize: fingerprint.DefaultSize,
fromBeginning: true,
maxLogSize: defaultMaxLogSize,
encoding: unicode.UTF8,
trimFunc: trim.Whitespace,
flushPeriod: defaultFlushPeriod,
sinkCallBufferSize: 100,
}
for _, opt := range opts {
opt(cfg)
}

splitFunc, err := sCfg.Func(enc, false, maxLogSize)
splitFunc, err := cfg.splitCfg.Func(cfg.encoding, false, cfg.maxLogSize)
require.NoError(t, err)

sink := emittest.NewSink()
sink := emittest.NewSink(emittest.WithCallBuffer(cfg.sinkCallBufferSize))
return &Factory{
SugaredLogger: testutil.Logger(t),
Config: &Config{
FingerprintSize: fingerprint.DefaultSize,
MaxLogSize: maxLogSize,
FingerprintSize: cfg.fingerprintSize,
MaxLogSize: cfg.maxLogSize,
FlushTimeout: cfg.flushPeriod,
Emit: sink.Callback,
FlushTimeout: flushPeriod,
},
FromBeginning: true,
Encoding: enc,
FromBeginning: cfg.fromBeginning,
Encoding: cfg.encoding,
SplitFunc: splitFunc,
TrimFunc: trim.Whitespace,
TrimFunc: cfg.trimFunc,
}, sink
}

type testFactoryOpt func(*testFactoryCfg)

type testFactoryCfg struct {
fingerprintSize int
fromBeginning bool
maxLogSize int
encoding encoding.Encoding
splitCfg split.Config
trimFunc trim.Func
flushPeriod time.Duration
sinkCallBufferSize int
}

func withFingerprintSize(size int) testFactoryOpt {
return func(c *testFactoryCfg) {
c.fingerprintSize = size
}
}

func withSplitConfig(cfg split.Config) testFactoryOpt {
return func(c *testFactoryCfg) {
c.splitCfg = cfg
}
}

func withMaxLogSize(maxLogSize int) testFactoryOpt {
return func(c *testFactoryCfg) {
c.maxLogSize = maxLogSize
}
}

func withFlushPeriod(flushPeriod time.Duration) testFactoryOpt {
return func(c *testFactoryCfg) {
c.flushPeriod = flushPeriod
}
}

func withSinkBufferSize(n int) testFactoryOpt {
return func(c *testFactoryCfg) {
c.sinkCallBufferSize = n
}
}
Loading

0 comments on commit 5d413ae

Please sign in to comment.