Skip to content

Commit

Permalink
Further improvements to parquet page materializaition (#5670)
Browse files Browse the repository at this point in the history
Also fixes the crash when reading from parquet with decoders
  • Loading branch information
malhotrashivam committed Jun 26, 2024
1 parent 2d89461 commit f165357
Show file tree
Hide file tree
Showing 62 changed files with 683 additions and 348 deletions.
16 changes: 12 additions & 4 deletions Util/src/main/java/io/deephaven/util/codec/MapCodec.java
Original file line number Diff line number Diff line change
Expand Up @@ -115,11 +115,10 @@ private ByteBuffer encodeIntoBuffer(final ByteBuffer scratch, @NotNull Map<K, V>

@Nullable
@Override
public Map<K, V> decode(@NotNull final byte[] input, final int offset, final int length) {
if (input.length == 0) {
public Map<K, V> decode(@NotNull final ByteBuffer byteBuffer) {
if (byteBuffer.remaining() == 0) {
return null;
}
final ByteBuffer byteBuffer = ByteBuffer.wrap(input);
final int size = byteBuffer.getInt();
if (size == 0) {
return Collections.emptyMap();
Expand All @@ -129,13 +128,22 @@ public Map<K, V> decode(@NotNull final byte[] input, final int offset, final int
final V value = decodeValue(byteBuffer);
return Collections.singletonMap(key, value);
}
final LinkedHashMap<K, V> result = new LinkedHashMap<>(size);
final Map<K, V> result = new LinkedHashMap<>(size);
for (int ii = 0; ii < size; ++ii) {
result.put(decodeKey(byteBuffer), decodeValue(byteBuffer));
}
return Collections.unmodifiableMap(result);
}

@Nullable
@Override
public Map<K, V> decode(@NotNull final byte[] input, final int offset, final int length) {
if (input.length == 0) {
return null;
}
return decode(ByteBuffer.wrap(input, offset, length));
}

/**
* Estimate the size of the encoded map.
*
Expand Down
22 changes: 21 additions & 1 deletion Util/src/main/java/io/deephaven/util/codec/ObjectDecoder.java
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;

import java.nio.ByteBuffer;

/**
* <p>
* Codec superinterface for Object translation from byte arrays for serialization and deserialization.
Expand All @@ -30,12 +32,30 @@ public interface ObjectDecoder<TYPE> {
*
* @param input The input byte array containing bytes to decode
* @param offset The offset into the byte array to start decoding from
* @param length The length of the byte array to decode from, starting at the offset
* @param length The number of bytes to decode, starting at the offset
* @return The output object, possibly null
*/
@Nullable
TYPE decode(@NotNull byte[] input, int offset, int length);

/**
* Decode an object from a ByteBuffer. The position of the input buffer may or may not be modified by this method.
*
* @param buffer The input ByteBuffer containing bytes to decode
* @return The output object, possibly null
*/
@Nullable
default TYPE decode(@NotNull final ByteBuffer buffer) {
if (buffer.hasArray()) {
return decode(buffer.array(), buffer.arrayOffset() + buffer.position(), buffer.remaining());
} else {
// Make a copy of the buffer's contents
final byte[] bytes = new byte[buffer.remaining()];
buffer.get(bytes);
return decode(bytes, 0, bytes.length);
}
}

/**
* What width byte array does this ObjectCodec expect to encode and decode?
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,9 @@ public byte[] encode(@Nullable final byte[] input) {
if (input == null) {
throw new IllegalArgumentException(SimpleByteArrayCodec.class.getSimpleName() + " cannot encode nulls");
}
return input;
final byte[] output = new byte[input.length];
System.arraycopy(input, 0, output, 0, input.length);
return output;
}

@Override
Expand All @@ -73,9 +75,6 @@ public byte[] decode(@NotNull final byte[] input, final int offset, final int le
if (input.length == 0) {
return CollectionUtil.ZERO_LENGTH_BYTE_ARRAY;
}
if (offset == 0 && length == input.length) {
return input;
}
final byte[] output = new byte[length];
System.arraycopy(input, offset, output, 0, length);
return output;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,9 +71,10 @@ interface ColumnPageReaderIterator {
}

/**
* @param pageMaterializerFactory The factory to use for constructing page materializers.
* @return An iterator over individual parquet pages.
*/
ColumnPageReaderIterator getPageIterator() throws IOException;
ColumnPageReaderIterator getPageIterator(PageMaterializerFactory pageMaterializerFactory) throws IOException;

interface ColumnPageDirectAccessor {
/**
Expand All @@ -86,9 +87,10 @@ interface ColumnPageDirectAccessor {
}

/**
* @param pageMaterializerFactory The factory to use for constructing page materializers.
* @return An accessor for individual parquet pages which uses the provided offset index.
*/
ColumnPageDirectAccessor getPageAccessor(OffsetIndex offsetIndex);
ColumnPageDirectAccessor getPageAccessor(OffsetIndex offsetIndex, PageMaterializerFactory pageMaterializerFactory);

/**
* @return Whether this column chunk uses a dictionary-based encoding on every page.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,6 @@ final class ColumnChunkReaderImpl implements ColumnChunkReader {
private final OffsetIndexReader offsetIndexReader;
private final List<Type> fieldTypes;
private final Function<SeekableChannelContext, Dictionary> dictionarySupplier;
private final PageMaterializerFactory pageMaterializerFactory;
private final URI columnChunkURI;
/**
* Number of rows in the row group of this column chunk.
Expand Down Expand Up @@ -81,7 +80,6 @@ final class ColumnChunkReaderImpl implements ColumnChunkReader {
}
this.fieldTypes = fieldTypes;
this.dictionarySupplier = new SoftCachingFunction<>(this::getDictionary);
this.pageMaterializerFactory = PageMaterializer.factoryForType(path.getPrimitiveType());
this.numRows = numRows;
this.version = version;
if (columnChunk.isSetFile_path() && FILE_URI_SCHEME.equals(rootURI.getScheme())) {
Expand Down Expand Up @@ -130,16 +128,18 @@ public OffsetIndex getOffsetIndex(final SeekableChannelContext context) {
}

@Override
public ColumnPageReaderIterator getPageIterator() {
return new ColumnPageReaderIteratorImpl();
public ColumnPageReaderIterator getPageIterator(final PageMaterializerFactory pageMaterializerFactory) {
return new ColumnPageReaderIteratorImpl(pageMaterializerFactory);
}

@Override
public ColumnPageDirectAccessor getPageAccessor(final OffsetIndex offsetIndex) {
public ColumnPageDirectAccessor getPageAccessor(
final OffsetIndex offsetIndex,
final PageMaterializerFactory pageMaterializerFactory) {
if (offsetIndex == null) {
throw new UnsupportedOperationException("Cannot use direct accessor without offset index");
}
return new ColumnPageDirectAccessorImpl(offsetIndex);
return new ColumnPageDirectAccessorImpl(offsetIndex, pageMaterializerFactory);
}

@Override
Expand Down Expand Up @@ -247,10 +247,12 @@ private Dictionary readDictionary(long dictionaryPageOffset, SeekableChannelCont
private final class ColumnPageReaderIteratorImpl implements ColumnPageReaderIterator {
private long nextHeaderOffset;
private long remainingValues;
PageMaterializerFactory pageMaterializerFactory;

ColumnPageReaderIteratorImpl() {
ColumnPageReaderIteratorImpl(final PageMaterializerFactory pageMaterializerFactory) {
this.remainingValues = columnChunk.meta_data.getNum_values();
this.nextHeaderOffset = columnChunk.meta_data.getData_page_offset();
this.pageMaterializerFactory = pageMaterializerFactory;
}

@Override
Expand Down Expand Up @@ -335,9 +337,12 @@ private static int getNumValues(PageHeader pageHeader) {
private final class ColumnPageDirectAccessorImpl implements ColumnPageDirectAccessor {

private final OffsetIndex offsetIndex;
private final PageMaterializerFactory pageMaterializerFactory;

ColumnPageDirectAccessorImpl(final OffsetIndex offsetIndex) {
ColumnPageDirectAccessorImpl(@NotNull final OffsetIndex offsetIndex,
@NotNull final PageMaterializerFactory pageMaterializerFactory) {
this.offsetIndex = offsetIndex;
this.pageMaterializerFactory = pageMaterializerFactory;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -345,7 +345,7 @@ private IntBuffer readKeysFromPageCommon(
final RunLengthBitPackingHybridBufferDecoder rlDecoder,
final RunLengthBitPackingHybridBufferDecoder dlDecoder,
final ValuesReader dataReader) throws IOException {
final Object result = materialize(IntMaterializer.Factory, dlDecoder, rlDecoder, dataReader, nullPlaceholder);
final Object result = materialize(IntMaterializer.FACTORY, dlDecoder, rlDecoder, dataReader, nullPlaceholder);
if (result instanceof DataWithOffsets) {
keyDest.put((int[]) ((DataWithOffsets) result).materializeResult);
return ((DataWithOffsets) result).offsets;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,148 +3,8 @@
//
package io.deephaven.parquet.base;

import io.deephaven.parquet.base.materializers.*;
import org.apache.parquet.schema.LogicalTypeAnnotation;
import org.apache.parquet.schema.PrimitiveType;
import org.jetbrains.annotations.NotNull;

import java.math.BigDecimal;
import java.math.BigInteger;

import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.BINARY;

public interface PageMaterializer {

/**
* Get the internal type used by Deephaven to represent a Parquet
* {@link LogicalTypeAnnotation.DecimalLogicalTypeAnnotation Decimal} logical type
*/
static Class<?> resolveDecimalLogicalType(
final LogicalTypeAnnotation.DecimalLogicalTypeAnnotation decimalLogicalType) {
// This pair of values (precision=1, scale=0) is set at write time as a marker so that we can recover
// the fact that the type is a BigInteger, not a BigDecimal when the fies are read.
if (decimalLogicalType.getPrecision() == 1 && decimalLogicalType.getScale() == 0) {
return BigInteger.class;
}
return BigDecimal.class;
}

static PageMaterializerFactory factoryForType(@NotNull final PrimitiveType primitiveType) {
final PrimitiveType.PrimitiveTypeName primitiveTypeName = primitiveType.getPrimitiveTypeName();
final LogicalTypeAnnotation logicalTypeAnnotation = primitiveType.getLogicalTypeAnnotation();
switch (primitiveTypeName) {
case INT32:
if (logicalTypeAnnotation instanceof LogicalTypeAnnotation.IntLogicalTypeAnnotation) {
final LogicalTypeAnnotation.IntLogicalTypeAnnotation intLogicalType =
(LogicalTypeAnnotation.IntLogicalTypeAnnotation) logicalTypeAnnotation;
if (intLogicalType.isSigned()) {
switch (intLogicalType.getBitWidth()) {
case 8:
return ByteMaterializer.Factory;
case 16:
return ShortMaterializer.Factory;
case 32:
return IntMaterializer.Factory;
}
} else {
switch (intLogicalType.getBitWidth()) {
case 8:
case 16:
return CharMaterializer.Factory;
case 32:
return LongFromUnsignedIntMaterializer.Factory;
}
}
} else if (logicalTypeAnnotation instanceof LogicalTypeAnnotation.DateLogicalTypeAnnotation) {
return LocalDateMaterializer.Factory;
} else if (logicalTypeAnnotation instanceof LogicalTypeAnnotation.TimeLogicalTypeAnnotation) {
final LogicalTypeAnnotation.TimeLogicalTypeAnnotation timeLogicalType =
(LogicalTypeAnnotation.TimeLogicalTypeAnnotation) logicalTypeAnnotation;
if (timeLogicalType.getUnit() != LogicalTypeAnnotation.TimeUnit.MILLIS) {
throw new IllegalArgumentException(
"Expected unit type to be MILLIS, found " + timeLogicalType.getUnit());
}
// isAdjustedToUTC parameter is ignored while reading LocalTime from Parquet files
return LocalTimeFromMillisMaterializer.Factory;
} else if (logicalTypeAnnotation instanceof LogicalTypeAnnotation.DecimalLogicalTypeAnnotation) {
final LogicalTypeAnnotation.DecimalLogicalTypeAnnotation decimalLogicalType =
(LogicalTypeAnnotation.DecimalLogicalTypeAnnotation) logicalTypeAnnotation;
return new BigDecimalFromIntMaterializer.Factory(decimalLogicalType.getScale());
}
return IntMaterializer.Factory;
case INT64:
if (logicalTypeAnnotation instanceof LogicalTypeAnnotation.TimestampLogicalTypeAnnotation) {
final LogicalTypeAnnotation.TimestampLogicalTypeAnnotation timestampLogicalType =
(LogicalTypeAnnotation.TimestampLogicalTypeAnnotation) logicalTypeAnnotation;
if (timestampLogicalType.isAdjustedToUTC()) {
// The column will store nanoseconds elapsed since epoch as long values
switch (timestampLogicalType.getUnit()) {
case MILLIS:
return InstantNanosFromMillisMaterializer.Factory;
case MICROS:
return InstantNanosFromMicrosMaterializer.Factory;
case NANOS:
return LongMaterializer.Factory;
}
} else {
// The column will be stored as LocalDateTime values
// Ref:https://github.com/apache/parquet-format/blob/master/LogicalTypes.md#local-semantics-timestamps-not-normalized-to-utc
switch (timestampLogicalType.getUnit()) {
case MILLIS:
return LocalDateTimeFromMillisMaterializer.Factory;
case MICROS:
return LocalDateTimeFromMicrosMaterializer.Factory;
case NANOS:
return LocalDateTimeFromNanosMaterializer.Factory;
}
}
} else if (logicalTypeAnnotation instanceof LogicalTypeAnnotation.TimeLogicalTypeAnnotation) {
final LogicalTypeAnnotation.TimeLogicalTypeAnnotation timeLogicalType =
(LogicalTypeAnnotation.TimeLogicalTypeAnnotation) logicalTypeAnnotation;
// isAdjustedToUTC parameter is ignored while reading LocalTime from Parquet files
switch (timeLogicalType.getUnit()) {
case MICROS:
return LocalTimeFromMicrosMaterializer.Factory;
case NANOS:
return LocalTimeFromNanosMaterializer.Factory;
default:
throw new IllegalArgumentException("Unsupported unit=" + timeLogicalType.getUnit());
}
} else if (logicalTypeAnnotation instanceof LogicalTypeAnnotation.DecimalLogicalTypeAnnotation) {
final LogicalTypeAnnotation.DecimalLogicalTypeAnnotation decimalLogicalType =
(LogicalTypeAnnotation.DecimalLogicalTypeAnnotation) logicalTypeAnnotation;
return new BigDecimalFromLongMaterializer.Factory(decimalLogicalType.getScale());
}
return LongMaterializer.Factory;
case INT96:
return InstantFromInt96Materializer.Factory;
case FLOAT:
return FloatMaterializer.Factory;
case DOUBLE:
return DoubleMaterializer.Factory;
case BOOLEAN:
return BoolMaterializer.Factory;
case BINARY:
if (logicalTypeAnnotation instanceof LogicalTypeAnnotation.StringLogicalTypeAnnotation) {
return StringMaterializer.Factory;
}
case FIXED_LEN_BYTE_ARRAY: // fall through
if (logicalTypeAnnotation instanceof LogicalTypeAnnotation.DecimalLogicalTypeAnnotation) {
final LogicalTypeAnnotation.DecimalLogicalTypeAnnotation decimalLogicalType =
(LogicalTypeAnnotation.DecimalLogicalTypeAnnotation) logicalTypeAnnotation;
final int encodedSizeInBytes = primitiveTypeName == BINARY ? -1 : primitiveType.getTypeLength();
if (resolveDecimalLogicalType(decimalLogicalType) == BigInteger.class) {
return new BigIntegerMaterializer.Factory(new BigIntegerParquetBytesCodec(encodedSizeInBytes));
}
return new BigDecimalFromBytesMaterializer.Factory(new BigDecimalParquetBytesCodec(
decimalLogicalType.getPrecision(), decimalLogicalType.getScale(), encodedSizeInBytes));
}
return BlobMaterializer.Factory;
default:
throw new RuntimeException("Unexpected type name:" + primitiveTypeName);
}
}

void fillNulls(int startIndex, int endIndex);

void fillValues(int startIndex, int endIndex);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,4 +9,16 @@ public interface PageMaterializerFactory {
PageMaterializer makeMaterializerWithNulls(ValuesReader dataReader, Object nullValue, int numValues);

PageMaterializer makeMaterializerNonNull(ValuesReader dataReader, int numValues);

PageMaterializerFactory NULL_FACTORY = new PageMaterializerFactory() {
@Override
public PageMaterializer makeMaterializerWithNulls(ValuesReader dataReader, Object nullValue, int numValues) {
throw new UnsupportedOperationException("Does not support materializing pages");
}

@Override
public PageMaterializer makeMaterializerNonNull(ValuesReader dataReader, int numValues) {
throw new UnsupportedOperationException("Does not support materializing pages");
}
};
}
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,7 @@ private BigDecimalFromBytesMaterializer(ValuesReader dataReader, BigDecimal null
@Override
public void fillValues(int startIndex, int endIndex) {
for (int ii = startIndex; ii < endIndex; ii++) {
final byte[] bytes = dataReader.readBytes().getBytes();
data[ii] = codec.decode(bytes, 0, bytes.length);
data[ii] = codec.decode(dataReader.readBytes().toByteBuffer());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,7 @@ private BigIntegerMaterializer(ValuesReader dataReader, BigInteger nullValue, in
@Override
public void fillValues(int startIndex, int endIndex) {
for (int ii = startIndex; ii < endIndex; ii++) {
final byte[] bytes = dataReader.readBytes().getBytes();
data[ii] = codec.decode(bytes, 0, bytes.length);
data[ii] = codec.decode(dataReader.readBytes().toByteBuffer());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,12 @@
import org.apache.parquet.column.values.ValuesReader;
import org.apache.parquet.io.api.Binary;

/**
* Materializer for binary data.
*/
public class BlobMaterializer extends ObjectMaterializerBase<Binary> implements PageMaterializer {

public static final PageMaterializerFactory Factory = new PageMaterializerFactory() {
public static final PageMaterializerFactory FACTORY = new PageMaterializerFactory() {
@Override
public PageMaterializer makeMaterializerWithNulls(ValuesReader dataReader, Object nullValue, int numValues) {
return new BlobMaterializer(dataReader, (Binary) nullValue, numValues);
Expand Down
Loading

0 comments on commit f165357

Please sign in to comment.