diff --git a/pkg/sources/chunker.go b/pkg/sources/chunker.go index 60d5c07d9753..ff108aa517d9 100644 --- a/pkg/sources/chunker.go +++ b/pkg/sources/chunker.go @@ -135,7 +135,7 @@ func readInChunks(ctx context.Context, reader io.Reader, config *chunkReaderConf chunkRes := ChunkResult{} chunkBytes := make([]byte, config.totalSize) chunkBytes = chunkBytes[:config.chunkSize] - n, err := chunkReader.Read(chunkBytes) + n, err := io.ReadFull(chunkReader, chunkBytes) if n > 0 { peekData, _ := chunkReader.Peek(config.totalSize - n) chunkBytes = append(chunkBytes[:n], peekData...) @@ -143,8 +143,8 @@ func readInChunks(ctx context.Context, reader io.Reader, config *chunkReaderConf } // If there is an error other than EOF, or if we have read some bytes, send the chunk. - if err != nil && !errors.Is(err, io.EOF) || n > 0 { - if err != nil && !errors.Is(err, io.EOF) { + if (err != nil && !errors.Is(err, io.ErrUnexpectedEOF)) || n > 0 { + if err != nil && !errors.Is(err, io.ErrUnexpectedEOF) { ctx.Logger().Error(err, "error reading chunk") chunkRes.err = err } diff --git a/pkg/sources/chunker_test.go b/pkg/sources/chunker_test.go index 921aefa4f3cf..b179919edbf8 100644 --- a/pkg/sources/chunker_test.go +++ b/pkg/sources/chunker_test.go @@ -5,6 +5,7 @@ import ( "io" "strings" "testing" + "testing/iotest" diskbufferreader "github.com/bill-rich/disk-buffer-reader" "github.com/stretchr/testify/assert" @@ -196,3 +197,23 @@ func BenchmarkChunkReader(b *testing.B) { assert.Nil(b, err) } } + +func TestFlakyReader(t *testing.T) { + a := "aaaa" + b := "bbbb" + + reader := iotest.OneByteReader(strings.NewReader(a + b)) + + chunkReader := NewChunkReader() + chunkResChan := chunkReader(context.TODO(), reader) + + var chunks []ChunkResult + for chunk := range chunkResChan { + chunks = append(chunks, chunk) + } + + assert.Equal(t, 1, len(chunks)) + chunk := chunks[0] + assert.NoError(t, chunk.Error()) + assert.Equal(t, a+b, string(chunk.Bytes())) +}