From b43f61dfd134ec35de698137b24b62b7d4a2d4c1 Mon Sep 17 00:00:00 2001 From: Miccah Castorina Date: Fri, 29 Sep 2023 17:02:47 -0700 Subject: [PATCH] Fix bug in chunker that surfaces with a flaky passed in io.Reader The chunker was previously expecting the passed in io.Reader to always successfully read a full buffer of data, however it's valid for a Reader to return less data than requested. When this happens, the chunker would peek the same data that it then reads in the next iteration of the loop, causing the same data to be scanned twice. Co-authored-by: ahrav --- pkg/sources/chunker.go | 6 +++--- pkg/sources/chunker_test.go | 21 +++++++++++++++++++++ 2 files changed, 24 insertions(+), 3 deletions(-) diff --git a/pkg/sources/chunker.go b/pkg/sources/chunker.go index 60d5c07d9753..ff108aa517d9 100644 --- a/pkg/sources/chunker.go +++ b/pkg/sources/chunker.go @@ -135,7 +135,7 @@ func readInChunks(ctx context.Context, reader io.Reader, config *chunkReaderConf chunkRes := ChunkResult{} chunkBytes := make([]byte, config.totalSize) chunkBytes = chunkBytes[:config.chunkSize] - n, err := chunkReader.Read(chunkBytes) + n, err := io.ReadFull(chunkReader, chunkBytes) if n > 0 { peekData, _ := chunkReader.Peek(config.totalSize - n) chunkBytes = append(chunkBytes[:n], peekData...) @@ -143,8 +143,8 @@ func readInChunks(ctx context.Context, reader io.Reader, config *chunkReaderConf } // If there is an error other than EOF, or if we have read some bytes, send the chunk. - if err != nil && !errors.Is(err, io.EOF) || n > 0 { - if err != nil && !errors.Is(err, io.EOF) { + if (err != nil && !errors.Is(err, io.ErrUnexpectedEOF)) || n > 0 { + if err != nil && !errors.Is(err, io.ErrUnexpectedEOF) { ctx.Logger().Error(err, "error reading chunk") chunkRes.err = err } diff --git a/pkg/sources/chunker_test.go b/pkg/sources/chunker_test.go index 921aefa4f3cf..b179919edbf8 100644 --- a/pkg/sources/chunker_test.go +++ b/pkg/sources/chunker_test.go @@ -5,6 +5,7 @@ import ( "io" "strings" "testing" + "testing/iotest" diskbufferreader "github.com/bill-rich/disk-buffer-reader" "github.com/stretchr/testify/assert" @@ -196,3 +197,23 @@ func BenchmarkChunkReader(b *testing.B) { assert.Nil(b, err) } } + +func TestFlakyReader(t *testing.T) { + a := "aaaa" + b := "bbbb" + + reader := iotest.OneByteReader(strings.NewReader(a + b)) + + chunkReader := NewChunkReader() + chunkResChan := chunkReader(context.TODO(), reader) + + var chunks []ChunkResult + for chunk := range chunkResChan { + chunks = append(chunks, chunk) + } + + assert.Equal(t, 1, len(chunks)) + chunk := chunks[0] + assert.NoError(t, chunk.Error()) + assert.Equal(t, a+b, string(chunk.Bytes())) +}