Skip to content

Commit

Permalink
Fix bug in chunker that surfaces with a flaky passed in io.Reader
Browse files Browse the repository at this point in the history
The chunker was previously expecting the passed in io.Reader to always
successfully read a full buffer of data, however it's valid for a Reader
to return less data than requested. When this happens, the chunker would
peek the same data that it then reads in the next iteration of the loop,
causing the same data to be scanned twice.

Co-authored-by: ahrav <[email protected]>
  • Loading branch information
mcastorina and ahrav committed Sep 30, 2023
1 parent b9a582b commit b43f61d
Show file tree
Hide file tree
Showing 2 changed files with 24 additions and 3 deletions.
6 changes: 3 additions & 3 deletions pkg/sources/chunker.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,16 +135,16 @@ func readInChunks(ctx context.Context, reader io.Reader, config *chunkReaderConf
chunkRes := ChunkResult{}
chunkBytes := make([]byte, config.totalSize)
chunkBytes = chunkBytes[:config.chunkSize]
n, err := chunkReader.Read(chunkBytes)
n, err := io.ReadFull(chunkReader, chunkBytes)
if n > 0 {
peekData, _ := chunkReader.Peek(config.totalSize - n)
chunkBytes = append(chunkBytes[:n], peekData...)
chunkRes.data = chunkBytes
}

// If there is an error other than EOF, or if we have read some bytes, send the chunk.
if err != nil && !errors.Is(err, io.EOF) || n > 0 {
if err != nil && !errors.Is(err, io.EOF) {
if (err != nil && !errors.Is(err, io.ErrUnexpectedEOF)) || n > 0 {
if err != nil && !errors.Is(err, io.ErrUnexpectedEOF) {
ctx.Logger().Error(err, "error reading chunk")
chunkRes.err = err
}
Expand Down
21 changes: 21 additions & 0 deletions pkg/sources/chunker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"io"
"strings"
"testing"
"testing/iotest"

diskbufferreader "github.com/bill-rich/disk-buffer-reader"
"github.com/stretchr/testify/assert"
Expand Down Expand Up @@ -196,3 +197,23 @@ func BenchmarkChunkReader(b *testing.B) {
assert.Nil(b, err)
}
}

func TestFlakyReader(t *testing.T) {
a := "aaaa"
b := "bbbb"

reader := iotest.OneByteReader(strings.NewReader(a + b))

chunkReader := NewChunkReader()
chunkResChan := chunkReader(context.TODO(), reader)

var chunks []ChunkResult
for chunk := range chunkResChan {
chunks = append(chunks, chunk)
}

assert.Equal(t, 1, len(chunks))
chunk := chunks[0]
assert.NoError(t, chunk.Error())
assert.Equal(t, a+b, string(chunk.Bytes()))
}

0 comments on commit b43f61d

Please sign in to comment.