Skip to content

Commit

Permalink
Avoid BufferedRowReaderIterator buffer re-allocation (#2139)
Browse files Browse the repository at this point in the history
* Avoid BufferedRowReaderIterator buffer re-allocation

* Close underlying iterator on BufferedRowReaderIterator.Close

* Revert "Close underlying iterator on BufferedRowReaderIterator.Close"

This reverts commit fe5af58.

* Adjust default row buffer size

* Reset row in SliceRowReader before serialization
  • Loading branch information
kolesnikovae authored Jul 21, 2023
1 parent c554f61 commit 1109379
Show file tree
Hide file tree
Showing 2 changed files with 22 additions and 31 deletions.
46 changes: 17 additions & 29 deletions pkg/parquet/row_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import (
)

const (
defaultRowBufferSize = 1024
defaultRowBufferSize = 64
)

var (
Expand Down Expand Up @@ -98,53 +98,41 @@ func (it *IteratorRowReader) ReadRows(rows []parquet.Row) (int, error) {
}

type BufferedRowReaderIterator struct {
reader parquet.RowReader
bufferedRows []parquet.Row

// buff keep the original slice capacity to avoid allocations
buff []parquet.Row
bufferSize int
err error
reader parquet.RowReader
buf []parquet.Row
err error
r int
w int
}

// NewBufferedRowReaderIterator returns a new `iter.Iterator[parquet.Row]` from a RowReader.
// The iterator will buffer `bufferSize` rows from the reader.
func NewBufferedRowReaderIterator(reader parquet.RowReader, bufferSize int) *BufferedRowReaderIterator {
return &BufferedRowReaderIterator{
reader: reader,
bufferSize: bufferSize,
reader: reader,
buf: make([]parquet.Row, bufferSize),
}
}

func (r *BufferedRowReaderIterator) Next() bool {
if len(r.bufferedRows) > 1 {
r.bufferedRows = r.bufferedRows[1:]
if r.r < r.w-1 {
r.r++
return true
}

// todo this seems to do allocations on every call since cap is always smaller
if cap(r.buff) < r.bufferSize {
r.buff = make([]parquet.Row, r.bufferSize)
}
r.bufferedRows = r.buff[:r.bufferSize]
n, err := r.reader.ReadRows(r.bufferedRows)
if err != nil && err != io.EOF {
var err error
if r.w, err = r.reader.ReadRows(r.buf); err != nil && err != io.EOF {
r.err = err
return false
}
if n == 0 {
return false
if r.w > 0 {
r.r = 0
return true
}

r.bufferedRows = r.bufferedRows[:n]
return true
return false
}

func (r *BufferedRowReaderIterator) At() parquet.Row {
if len(r.bufferedRows) == 0 {
return parquet.Row{}
}
return r.bufferedRows[0]
return r.buf[r.r]
}

func (r *BufferedRowReaderIterator) Err() error {
Expand Down
7 changes: 5 additions & 2 deletions pkg/phlaredb/schemas/v1/profiles.go
Original file line number Diff line number Diff line change
Expand Up @@ -178,8 +178,11 @@ func (r *SliceRowReader[T]) ReadRows(rows []parquet.Row) (n int, err error) {
err = io.EOF
}
for pos, p := range r.slice[:len(rows)] {
// serialize the row
rows[pos] = r.serialize(p, rows[pos])
// Serialize the row. Note that the row may
// be already initialized and contain values,
// therefore it must be reset.
row := rows[pos][:0]
rows[pos] = r.serialize(p, row)
n++
}
r.slice = r.slice[len(rows):]
Expand Down

0 comments on commit 1109379

Please sign in to comment.