Skip to content

Commit

Permalink
Added support to read mixed encoded parquet files (#5176)
Browse files Browse the repository at this point in the history
  • Loading branch information
malhotrashivam authored Feb 24, 2024
1 parent 8f608df commit e29e7c6
Show file tree
Hide file tree
Showing 12 changed files with 118 additions and 156 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,11 @@ interface ColumnPageReaderIterator {
interface ColumnPageDirectAccessor {
/**
* Directly access a page reader for a given page number.
*
* @param pageNum The page number to access.
* @param channelContext The channel context to use for constructing the reader
*/
ColumnPageReader getPageReader(int pageNum);
ColumnPageReader getPageReader(int pageNum, SeekableChannelContext channelContext);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,18 +98,13 @@ public int getMaxRl() {
return path.getMaxRepetitionLevel();
}

public final OffsetIndex getOffsetIndex() {
public OffsetIndex getOffsetIndex() {
return offsetIndex;
}

@Override
public ColumnPageReaderIterator getPageIterator() {
final long dataPageOffset = columnChunk.meta_data.getData_page_offset();
if (offsetIndex == null) {
return new ColumnPageReaderIteratorImpl(dataPageOffset, columnChunk.getMeta_data().getNum_values());
} else {
return new ColumnPageReaderIteratorIndexImpl();
}
return new ColumnPageReaderIteratorImpl();
}

@Override
Expand Down Expand Up @@ -230,9 +225,9 @@ private final class ColumnPageReaderIteratorImpl implements ColumnPageReaderIter
private long nextHeaderOffset;
private long remainingValues;

ColumnPageReaderIteratorImpl(final long startOffset, final long numValues) {
this.remainingValues = numValues;
this.nextHeaderOffset = startOffset;
ColumnPageReaderIteratorImpl() {
this.remainingValues = columnChunk.meta_data.getNum_values();
this.nextHeaderOffset = columnChunk.meta_data.getData_page_offset();
}

@Override
Expand All @@ -251,37 +246,41 @@ public ColumnPageReader next(@NotNull final SeekableChannelContext channelContex
final ContextHolder holder = SeekableChannelContext.ensureContext(channelsProvider, channelContext);
final SeekableByteChannel ch = channelsProvider.getReadChannel(holder.get(), getURI())) {
ch.position(headerOffset);
final PageHeader pageHeader;
try (final InputStream in = SeekableChannelsProvider.channelPositionInputStream(channelsProvider, ch)) {
pageHeader = Util.readPageHeader(in);
}
final PageHeader pageHeader = readPageHeader(ch);
// relying on exact position of ch
final long dataOffset = ch.position();
nextHeaderOffset = dataOffset + pageHeader.getCompressed_page_size();
if (pageHeader.isSetDictionary_page_header()) {
// Dictionary page; skip it
final PageType pageType = pageHeader.type;
if (pageType == PageType.DICTIONARY_PAGE && headerOffset == columnChunk.meta_data.getData_page_offset()
&& columnChunk.meta_data.getDictionary_page_offset() == 0) {
// https://stackoverflow.com/questions/55225108/why-is-dictionary-page-offset-0-for-plain-dictionary-encoding
// Skip the dictionary page and jump to the data page
return next(holder.get());
}
if (!pageHeader.isSetData_page_header() && !pageHeader.isSetData_page_header_v2()) {
throw new IllegalStateException(
"Expected data page, but neither v1 nor v2 data page header is set in file "
+ ch + " at offset " + headerOffset);
if (pageType != PageType.DATA_PAGE && pageType != PageType.DATA_PAGE_V2) {
throw new IllegalStateException("Expected data page, but got " + pageType + " at offset " +
headerOffset + " for file " + getURI());
}
remainingValues -= getNumValues(pageHeader);
final org.apache.parquet.format.Encoding encoding = getEncoding(pageHeader);
final int numValuesInPage = getNumValues(pageHeader);
remainingValues -= numValuesInPage;
final Function<SeekableChannelContext, Dictionary> pageDictionarySupplier =
(encoding == PLAIN_DICTIONARY || encoding == RLE_DICTIONARY)
? dictionarySupplier
: (SeekableChannelContext context) -> NULL_DICTIONARY;
return new ColumnPageReaderImpl(channelsProvider, decompressor,
pageDictionarySupplier, nullMaterializerFactory, path, getURI(), fieldTypes,
dataOffset, pageHeader, ColumnPageReaderImpl.NULL_NUM_VALUES);
getPageDictionarySupplier(pageHeader);
return new ColumnPageReaderImpl(channelsProvider, decompressor, pageDictionarySupplier,
nullMaterializerFactory, path, getURI(), fieldTypes, dataOffset, pageHeader, numValuesInPage);
} catch (IOException e) {
throw new UncheckedDeephavenException("Error reading page header", e);
throw new UncheckedDeephavenException("Error reading page header at offset " + headerOffset + " for " +
"file " + getURI(), e);
}
}
}

private Function<SeekableChannelContext, Dictionary> getPageDictionarySupplier(final PageHeader pageHeader) {
final org.apache.parquet.format.Encoding encoding = getEncoding(pageHeader);
return (encoding == PLAIN_DICTIONARY || encoding == RLE_DICTIONARY)
? dictionarySupplier
: (SeekableChannelContext context) -> NULL_DICTIONARY;
}

private static org.apache.parquet.format.Encoding getEncoding(PageHeader pageHeader) {
switch (pageHeader.type) {
case DATA_PAGE:
Expand All @@ -294,58 +293,51 @@ private static org.apache.parquet.format.Encoding getEncoding(PageHeader pageHea
}
}

private PageHeader readPageHeader(final SeekableByteChannel ch) throws IOException {
try (final InputStream in = SeekableChannelsProvider.channelPositionInputStream(channelsProvider, ch)) {
return Util.readPageHeader(in);
}
}

private static int getNumValues(PageHeader pageHeader) {
return pageHeader.isSetData_page_header()
? pageHeader.getData_page_header().getNum_values()
: pageHeader.getData_page_header_v2().getNum_values();
}

private final class ColumnPageReaderIteratorIndexImpl implements ColumnPageReaderIterator {
private int pos;

ColumnPageReaderIteratorIndexImpl() {
pos = 0;
}

@Override
public boolean hasNext() {
return offsetIndex.getPageCount() > pos;
}

@Override
public ColumnPageReader next(@NotNull final SeekableChannelContext channelContext) {
if (!hasNext()) {
throw new NoSuchElementException("No next element");
}
// Following logic assumes that offsetIndex will store the number of values for a page instead of number
// of rows (which can be different for array and vector columns). This behavior is because of a bug on
// parquet writing side which got fixed in deephaven-core/pull/4844 and is only kept to support reading
// parquet files written before deephaven-core/pull/4844.
final int numValues = (int) (offsetIndex.getLastRowIndex(pos, columnChunk.getMeta_data().getNum_values())
- offsetIndex.getFirstRowIndex(pos) + 1);
final ColumnPageReader columnPageReader =
new ColumnPageReaderImpl(channelsProvider, decompressor, dictionarySupplier,
nullMaterializerFactory, path, getURI(), fieldTypes, offsetIndex.getOffset(pos), null,
numValues);
pos++;
return columnPageReader;
}
}

private final class ColumnPageDirectAccessorImpl implements ColumnPageDirectAccessor {

ColumnPageDirectAccessorImpl() {}

@Override
public ColumnPageReader getPageReader(final int pageNum) {
public ColumnPageReader getPageReader(final int pageNum, final SeekableChannelContext channelContext) {
if (pageNum < 0 || pageNum >= offsetIndex.getPageCount()) {
throw new IndexOutOfBoundsException(
"pageNum=" + pageNum + ", offsetIndex.getPageCount()=" + offsetIndex.getPageCount());
}
// Page header and number of values will be populated later when we read the page header from the file
return new ColumnPageReaderImpl(channelsProvider, decompressor, dictionarySupplier, nullMaterializerFactory,
path, getURI(), fieldTypes, offsetIndex.getOffset(pageNum), null,
ColumnPageReaderImpl.NULL_NUM_VALUES);

// Read the page header to determine whether we need to use dictionary for this page
final long headerOffset = offsetIndex.getOffset(pageNum);
try (
final ContextHolder holder = SeekableChannelContext.ensureContext(channelsProvider, channelContext);
final SeekableByteChannel ch = channelsProvider.getReadChannel(holder.get(), getURI())) {
ch.position(headerOffset);
final PageHeader pageHeader = readPageHeader(ch);
final long dataOffset = ch.position();
final PageType pageType = pageHeader.type;
if (pageType != PageType.DATA_PAGE && pageType != PageType.DATA_PAGE_V2) {
throw new IllegalStateException("Expected data page, but got " + pageType + " for page number "
+ pageNum + " at offset " + headerOffset + " for file " + getURI());
}
final Function<SeekableChannelContext, Dictionary> pageDictionarySupplier =
getPageDictionarySupplier(pageHeader);
return new ColumnPageReaderImpl(channelsProvider, decompressor, pageDictionarySupplier,
nullMaterializerFactory, path, getURI(), fieldTypes, dataOffset, pageHeader,
getNumValues(pageHeader));
} catch (final IOException e) {
throw new UncheckedDeephavenException("Error reading page header for page number " + pageNum +
" at offset " + headerOffset + " for file " + getURI(), e);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,10 @@ public interface ColumnPageReader extends AutoCloseable {

/**
* @param channelContext The channel context to use for reading the parquet file
* @return The number of rows in this ColumnChunk, or -1 if it's unknown.
* @return The number of rows in this page, or -1 if it's unknown.
*/
default long numRows(final SeekableChannelContext channelContext) throws IOException {
return numValues(channelContext);
return numValues();
}

/**
Expand All @@ -47,10 +47,9 @@ IntBuffer readKeyValues(IntBuffer keyDest, int nullPlaceholder,
SeekableChannelContext channelContext) throws IOException;

/**
* @param channelContext The channel context to use for reading the parquet file
* @return The value stored under number DataPageHeader.num_values
* @return The number of values in this page
*/
int numValues(SeekableChannelContext channelContext) throws IOException;
int numValues();

/**
* @param channelContext The channel context to use for reading the parquet file
Expand Down
Loading

0 comments on commit e29e7c6

Please sign in to comment.