diff --git a/extensions/parquet/base/src/main/java/io/deephaven/parquet/base/ColumnWriterImpl.java b/extensions/parquet/base/src/main/java/io/deephaven/parquet/base/ColumnWriterImpl.java index 35c4d4916c8..e17107a648b 100644 --- a/extensions/parquet/base/src/main/java/io/deephaven/parquet/base/ColumnWriterImpl.java +++ b/extensions/parquet/base/src/main/java/io/deephaven/parquet/base/ColumnWriterImpl.java @@ -23,14 +23,12 @@ import org.apache.parquet.schema.PrimitiveType; import org.jetbrains.annotations.NotNull; -import java.io.BufferedOutputStream; import java.io.ByteArrayOutputStream; import java.io.IOException; import java.io.OutputStream; import java.nio.ByteBuffer; import java.nio.IntBuffer; import java.nio.channels.Channels; -import java.nio.channels.SeekableByteChannel; import java.nio.channels.WritableByteChannel; import java.util.HashSet; import java.util.Set; @@ -38,19 +36,11 @@ import static org.apache.parquet.bytes.BytesUtils.getWidthFromMaxInt; import static org.apache.parquet.format.Util.writePageHeader; -public class ColumnWriterImpl implements ColumnWriter { +public final class ColumnWriterImpl implements ColumnWriter { private static final int MIN_SLAB_SIZE = 64; - private final SeekableByteChannel writeChannel; - /** - * Buffered stream that writes to {@link #writeChannel}. Used for buffering small writes, particularly page headers. - */ - private final BufferedOutputStream bufferedOutput; - /** - * Following is used to set the size of buffer for {@link #bufferedOutput}. In our testing, we found the page - * headers to be much smaller than 16 KB, so using that as the default size. - */ - private static final int EXPECTED_PAGE_HEADER_SIZE = 16 << 10; + + private final PositionedBufferedOutputStream bufferedOutput; private final ColumnDescriptor column; private final RowGroupWriterImpl owner; private final CompressorAdapter compressorAdapter; @@ -78,13 +68,12 @@ public class ColumnWriterImpl implements ColumnWriter { ColumnWriterImpl( final RowGroupWriterImpl owner, - final SeekableByteChannel writeChannel, + final PositionedBufferedOutputStream bufferedOutput, final ColumnDescriptor column, final CompressorAdapter compressorAdapter, final int targetPageSize, final ByteBufferAllocator allocator) { - this.writeChannel = writeChannel; - bufferedOutput = new BufferedOutputStream(Channels.newOutputStream(writeChannel), EXPECTED_PAGE_HEADER_SIZE); + this.bufferedOutput = bufferedOutput; this.column = column; this.compressorAdapter = compressorAdapter; this.targetPageSize = targetPageSize; @@ -143,25 +132,25 @@ public void addDictionaryPage(@NotNull final Object dictionaryValues, final int // noinspection unchecked dictionaryWriter.writeBulk(dictionaryValues, valuesCount, NullStatistics.INSTANCE); - dictionaryOffset = writeChannel.position(); + dictionaryOffset = bufferedOutput.position(); writeDictionaryPage(dictionaryWriter.getByteBufferView(), valuesCount); pageCount++; hasDictionary = true; dictionaryPage = new DictionaryPageHeader(valuesCount, org.apache.parquet.format.Encoding.PLAIN); } - public void writeDictionaryPage(final ByteBuffer dictionaryBuffer, final int valuesCount) throws IOException { - long currentChunkDictionaryPageOffset = writeChannel.position(); - int uncompressedSize = dictionaryBuffer.remaining(); + private void writeDictionaryPage(final ByteBuffer dictionaryBuffer, final int valuesCount) throws IOException { + final long currentChunkDictionaryPageOffset = bufferedOutput.position(); + final int uncompressedSize = dictionaryBuffer.remaining(); compressorAdapter.reset(); - ByteArrayOutputStream baos = new ByteArrayOutputStream(); - try (WritableByteChannel channel = Channels.newChannel(compressorAdapter.compress(baos))) { + final ByteArrayOutputStream baos = new ByteArrayOutputStream(); + try (final WritableByteChannel channel = Channels.newChannel(compressorAdapter.compress(baos))) { channel.write(dictionaryBuffer); } - BytesInput compressedBytes = BytesInput.from(baos); + final BytesInput compressedBytes = BytesInput.from(baos); - int compressedPageSize = (int) compressedBytes.size(); + final int compressedPageSize = (int) compressedBytes.size(); metadataConverter.writeDictionaryPageHeader( uncompressedSize, @@ -169,11 +158,10 @@ public void writeDictionaryPage(final ByteBuffer dictionaryBuffer, final int val valuesCount, Encoding.PLAIN, bufferedOutput); - bufferedOutput.flush(); - long headerSize = writeChannel.position() - currentChunkDictionaryPageOffset; + final long headerSize = bufferedOutput.position() - currentChunkDictionaryPageOffset; this.uncompressedLength += uncompressedSize + headerSize; this.compressedLength += compressedPageSize + headerSize; - writeChannel.write(compressedBytes.toByteBuffer()); + compressedBytes.writeAllTo(bufferedOutput); encodings.add(Encoding.PLAIN); } @@ -183,7 +171,7 @@ private BulkWriter getWriter(final PrimitiveType primitiveType) { case FIXED_LEN_BYTE_ARRAY: throw new UnsupportedOperationException("No support for writing FIXED_LENGTH or INT96 types"); case INT32: - LogicalTypeAnnotation annotation = primitiveType.getLogicalTypeAnnotation(); + final LogicalTypeAnnotation annotation = primitiveType.getLogicalTypeAnnotation(); if (annotation != null) { // Appropriately set the null value for different type of integers if (LogicalTypeAnnotation.intType(8, true).equals(annotation)) { @@ -239,7 +227,7 @@ public void addVectorPage( } initWriter(); // noinspection unchecked - int valueCount = + final int valueCount = bulkWriter.writeBulkVector(pageData, repeatCount, rlEncoder, dlEncoder, nonNullValueCount, statistics); writePage(bulkWriter.getByteBufferView(), valueCount); bulkWriter.reset(); @@ -271,12 +259,12 @@ private PageHeader newDataPageV2Header( final int rlByteLength, final int dlByteLength) { // TODO: pageHeader.crc = ...; - DataPageHeaderV2 dataPageHeaderV2 = new DataPageHeaderV2( + final DataPageHeaderV2 dataPageHeaderV2 = new DataPageHeaderV2( valueCount, nullCount, rowCount, hasDictionary ? org.apache.parquet.format.Encoding.PLAIN_DICTIONARY : org.apache.parquet.format.Encoding.PLAIN, dlByteLength, rlByteLength); - PageHeader pageHeader = new PageHeader(PageType.DATA_PAGE_V2, uncompressedSize, compressedSize); + final PageHeader pageHeader = new PageHeader(PageType.DATA_PAGE_V2, uncompressedSize, compressedSize); pageHeader.setData_page_header_v2(dataPageHeaderV2); if (hasDictionary) { pageHeader.setDictionary_page_header(dictionaryPage); @@ -293,19 +281,19 @@ public void writePageV2( final BytesInput repetitionLevels, final BytesInput definitionLevels, final ByteBuffer data) throws IOException { - int rlByteLength = (int) repetitionLevels.size(); - int dlByteLength = (int) definitionLevels.size(); - int uncompressedDataSize = data.remaining(); - int uncompressedSize = (int) (uncompressedDataSize + repetitionLevels.size() + definitionLevels.size()); + final int rlByteLength = (int) repetitionLevels.size(); + final int dlByteLength = (int) definitionLevels.size(); + final int uncompressedDataSize = data.remaining(); + final int uncompressedSize = (int) (uncompressedDataSize + repetitionLevels.size() + definitionLevels.size()); - ByteArrayOutputStream baos = new ByteArrayOutputStream(); - try (WritableByteChannel channel = Channels.newChannel(compressorAdapter.compress(baos))) { + final ByteArrayOutputStream baos = new ByteArrayOutputStream(); + try (final WritableByteChannel channel = Channels.newChannel(compressorAdapter.compress(baos))) { channel.write(data); } - BytesInput compressedData = BytesInput.from(baos); - int compressedSize = (int) (compressedData.size() + repetitionLevels.size() + definitionLevels.size()); + final BytesInput compressedData = BytesInput.from(baos); + final int compressedSize = (int) (compressedData.size() + repetitionLevels.size() + definitionLevels.size()); - long initialOffset = writeChannel.position(); + final long initialOffset = bufferedOutput.position(); if (firstDataPageOffset == -1) { firstDataPageOffset = initialOffset; } @@ -315,25 +303,24 @@ public void writePageV2( rlByteLength, dlByteLength, bufferedOutput); - bufferedOutput.flush(); - long headerSize = writeChannel.position() - initialOffset; + final long headerSize = bufferedOutput.position() - initialOffset; this.uncompressedLength += (uncompressedSize + headerSize); this.compressedLength += (compressedSize + headerSize); this.totalValueCount += valueCount; this.pageCount += 1; - writeChannel.write(definitionLevels.toByteBuffer()); - writeChannel.write(compressedData.toByteBuffer()); + definitionLevels.writeAllTo(bufferedOutput); + compressedData.writeAllTo(bufferedOutput); } private void writePage(final BytesInput bytes, final int valueCount, final Encoding valuesEncoding) throws IOException { - long initialOffset = writeChannel.position(); + final long initialOffset = bufferedOutput.position(); if (firstDataPageOffset == -1) { firstDataPageOffset = initialOffset; } - long uncompressedSize = bytes.size(); + final long uncompressedSize = bytes.size(); if (uncompressedSize > Integer.MAX_VALUE) { throw new ParquetEncodingException( "Cannot write page larger than Integer.MAX_VALUE bytes: " + @@ -342,13 +329,13 @@ private void writePage(final BytesInput bytes, final int valueCount, final Encod compressorAdapter.reset(); - ByteArrayOutputStream baos = new ByteArrayOutputStream(); - try (OutputStream cos = compressorAdapter.compress(baos)) { + final ByteArrayOutputStream baos = new ByteArrayOutputStream(); + try (final OutputStream cos = compressorAdapter.compress(baos)) { bytes.writeAllTo(cos); } - BytesInput compressedBytes = BytesInput.from(baos); + final BytesInput compressedBytes = BytesInput.from(baos); - long compressedSize = compressedBytes.size(); + final long compressedSize = compressedBytes.size(); if (compressedSize > Integer.MAX_VALUE) { throw new ParquetEncodingException( "Cannot write compressed page larger than Integer.MAX_VALUE bytes: " @@ -360,15 +347,14 @@ private void writePage(final BytesInput bytes, final int valueCount, final Encod valueCount, valuesEncoding, bufferedOutput); - bufferedOutput.flush(); - long headerSize = writeChannel.position() - initialOffset; + final long headerSize = bufferedOutput.position() - initialOffset; this.uncompressedLength += (uncompressedSize + headerSize); this.compressedLength += (compressedSize + headerSize); this.totalValueCount += valueCount; this.pageCount += 1; - writeChannel.write(compressedBytes.toByteBuffer()); - offsetIndexBuilder.add((int) (writeChannel.position() - initialOffset), valueCount); + compressedBytes.writeAllTo(bufferedOutput); + offsetIndexBuilder.add((int) (bufferedOutput.position() - initialOffset), valueCount); encodings.add(valuesEncoding); encodingStatsBuilder.addDataEncoding(valuesEncoding); } @@ -385,12 +371,12 @@ private void writeDataPageV1Header( valuesEncoding), to); } - private PageHeader newDataPageHeader( + private static PageHeader newDataPageHeader( final int uncompressedSize, final int compressedSize, final int valueCount, final Encoding valuesEncoding) { - PageHeader pageHeader = new PageHeader(PageType.DATA_PAGE, uncompressedSize, compressedSize); + final PageHeader pageHeader = new PageHeader(PageType.DATA_PAGE, uncompressedSize, compressedSize); pageHeader.setData_page_header(new DataPageHeader( valueCount, @@ -409,11 +395,11 @@ private void writePage(final ByteBuffer encodedData, final long valueCount) { try { BytesInput bytes = BytesInput.from(encodedData); if (dlEncoder != null) { - BytesInput dlBytesInput = dlEncoder.toBytes(); + final BytesInput dlBytesInput = dlEncoder.toBytes(); bytes = BytesInput.concat(BytesInput.fromInt((int) dlBytesInput.size()), dlBytesInput, bytes); } if (rlEncoder != null) { - BytesInput rlBytesInput = rlEncoder.toBytes(); + final BytesInput rlBytesInput = rlEncoder.toBytes(); bytes = BytesInput.concat(BytesInput.fromInt((int) rlBytesInput.size()), rlBytesInput, bytes); } writePage( @@ -453,7 +439,7 @@ public ColumnDescriptor getColumn() { return column; } - public OffsetIndex getOffsetIndex() { + OffsetIndex getOffsetIndex() { return offsetIndexBuilder.build(firstDataPageOffset); } diff --git a/extensions/parquet/base/src/main/java/io/deephaven/parquet/base/ParquetFileWriter.java b/extensions/parquet/base/src/main/java/io/deephaven/parquet/base/ParquetFileWriter.java index b9f6a37b170..ef461e539e1 100644 --- a/extensions/parquet/base/src/main/java/io/deephaven/parquet/base/ParquetFileWriter.java +++ b/extensions/parquet/base/src/main/java/io/deephaven/parquet/base/ParquetFileWriter.java @@ -18,10 +18,6 @@ import org.apache.parquet.schema.MessageType; import java.io.IOException; -import java.io.OutputStream; -import java.nio.ByteBuffer; -import java.nio.channels.Channels; -import java.nio.channels.SeekableByteChannel; import java.util.ArrayList; import java.util.HashMap; import java.util.List; @@ -29,15 +25,15 @@ import static org.apache.parquet.format.Util.writeFileMetaData; -public class ParquetFileWriter { +public final class ParquetFileWriter { private static final ParquetMetadataConverter metadataConverter = new ParquetMetadataConverter(); private static final int VERSION = 1; + private static final int OUTPUT_BUFFER_SIZE = 1 << 18; - private final SeekableByteChannel writeChannel; + private final PositionedBufferedOutputStream bufferedOutput; private final MessageType type; private final int targetPageSize; private final ByteBufferAllocator allocator; - private final SeekableChannelsProvider channelsProvider; private final CompressorAdapter compressorAdapter; private final Map extraMetaData; private final List blocks = new ArrayList<>(); @@ -54,25 +50,16 @@ public ParquetFileWriter( this.targetPageSize = targetPageSize; this.allocator = allocator; this.extraMetaData = new HashMap<>(extraMetaData); - writeChannel = channelsProvider.getWriteChannel(filePath, false); // TODO add support for appending - writeChannel.write(ByteBuffer.wrap(ParquetFileReader.MAGIC)); + bufferedOutput = new PositionedBufferedOutputStream(channelsProvider.getWriteChannel(filePath, false), + OUTPUT_BUFFER_SIZE); + bufferedOutput.write(ParquetFileReader.MAGIC); this.type = type; - this.channelsProvider = channelsProvider; this.compressorAdapter = DeephavenCompressorAdapterFactory.getInstance().getByName(codecName); } - @SuppressWarnings("unused") - RowGroupWriter addRowGroup(final String path, final boolean append) throws IOException { - RowGroupWriterImpl rowGroupWriter = - new RowGroupWriterImpl(path, append, channelsProvider, type, targetPageSize, allocator, - compressorAdapter); - blocks.add(rowGroupWriter.getBlock()); - return rowGroupWriter; - } - public RowGroupWriter addRowGroup(final long size) { - RowGroupWriterImpl rowGroupWriter = - new RowGroupWriterImpl(writeChannel, type, targetPageSize, allocator, compressorAdapter); + final RowGroupWriterImpl rowGroupWriter = + new RowGroupWriterImpl(bufferedOutput, type, targetPageSize, allocator, compressorAdapter); rowGroupWriter.getBlock().setRowCount(size); blocks.add(rowGroupWriter.getBlock()); offsetIndexes.add(rowGroupWriter.offsetIndexes()); @@ -80,41 +67,37 @@ public RowGroupWriter addRowGroup(final long size) { } public void close() throws IOException { - try (final OutputStream os = Channels.newOutputStream(writeChannel)) { - serializeOffsetIndexes(offsetIndexes, blocks, os); - ParquetMetadata footer = - new ParquetMetadata(new FileMetaData(type, extraMetaData, Version.FULL_VERSION), blocks); - serializeFooter(footer, os); - } - // os (and thus writeChannel) are closed at this point. - + serializeOffsetIndexes(); + final ParquetMetadata footer = + new ParquetMetadata(new FileMetaData(type, extraMetaData, Version.FULL_VERSION), blocks); + serializeFooter(footer); + // Flush any buffered data and close the channel + bufferedOutput.close(); compressorAdapter.close(); } - private void serializeFooter(final ParquetMetadata footer, final OutputStream os) throws IOException { - final long footerIndex = writeChannel.position(); - org.apache.parquet.format.FileMetaData parquetMetadata = metadataConverter.toParquetMetadata(VERSION, footer); - writeFileMetaData(parquetMetadata, os); - BytesUtils.writeIntLittleEndian(os, (int) (writeChannel.position() - footerIndex)); - os.write(ParquetFileReader.MAGIC); + private void serializeFooter(final ParquetMetadata footer) throws IOException { + final long footerIndex = bufferedOutput.position(); + final org.apache.parquet.format.FileMetaData parquetMetadata = + metadataConverter.toParquetMetadata(VERSION, footer); + writeFileMetaData(parquetMetadata, bufferedOutput); + BytesUtils.writeIntLittleEndian(bufferedOutput, (int) (bufferedOutput.position() - footerIndex)); + bufferedOutput.write(ParquetFileReader.MAGIC); } - private void serializeOffsetIndexes( - final List> offsetIndexes, - final List blocks, - final OutputStream os) throws IOException { + private void serializeOffsetIndexes() throws IOException { for (int bIndex = 0, bSize = blocks.size(); bIndex < bSize; ++bIndex) { final List columns = blocks.get(bIndex).getColumns(); final List blockOffsetIndexes = offsetIndexes.get(bIndex); for (int cIndex = 0, cSize = columns.size(); cIndex < cSize; ++cIndex) { - OffsetIndex offsetIndex = blockOffsetIndexes.get(cIndex); + final OffsetIndex offsetIndex = blockOffsetIndexes.get(cIndex); if (offsetIndex == null) { continue; } - ColumnChunkMetaData column = columns.get(cIndex); - final long offset = writeChannel.position(); - Util.writeOffsetIndex(ParquetMetadataConverter.toParquetOffsetIndex(offsetIndex), os); - column.setOffsetIndexReference(new IndexReference(offset, (int) (writeChannel.position() - offset))); + final ColumnChunkMetaData column = columns.get(cIndex); + final long offset = bufferedOutput.position(); + Util.writeOffsetIndex(ParquetMetadataConverter.toParquetOffsetIndex(offsetIndex), bufferedOutput); + column.setOffsetIndexReference(new IndexReference(offset, (int) (bufferedOutput.position() - offset))); } } } diff --git a/extensions/parquet/base/src/main/java/io/deephaven/parquet/base/PositionedBufferedOutputStream.java b/extensions/parquet/base/src/main/java/io/deephaven/parquet/base/PositionedBufferedOutputStream.java new file mode 100644 index 00000000000..f343f0bcb2e --- /dev/null +++ b/extensions/parquet/base/src/main/java/io/deephaven/parquet/base/PositionedBufferedOutputStream.java @@ -0,0 +1,24 @@ +package io.deephaven.parquet.base; + +import java.io.BufferedOutputStream; +import java.io.IOException; +import java.nio.channels.Channels; +import java.nio.channels.SeekableByteChannel; + +final class PositionedBufferedOutputStream extends BufferedOutputStream { + + private final SeekableByteChannel writeChannel; + + PositionedBufferedOutputStream(final SeekableByteChannel writeChannel, final int size) { + super(Channels.newOutputStream(writeChannel), size); + this.writeChannel = writeChannel; + } + + /** + * Get total number of bytes written to this stream + */ + long position() throws IOException { + // Number of bytes buffered in the stream + bytes written to the underlying channel + return this.count + writeChannel.position(); + } +} diff --git a/extensions/parquet/base/src/main/java/io/deephaven/parquet/base/RowGroupWriterImpl.java b/extensions/parquet/base/src/main/java/io/deephaven/parquet/base/RowGroupWriterImpl.java index bdd30bfe4d5..0e82df9b995 100644 --- a/extensions/parquet/base/src/main/java/io/deephaven/parquet/base/RowGroupWriterImpl.java +++ b/extensions/parquet/base/src/main/java/io/deephaven/parquet/base/RowGroupWriterImpl.java @@ -3,7 +3,6 @@ */ package io.deephaven.parquet.base; -import io.deephaven.parquet.base.util.SeekableChannelsProvider; import io.deephaven.parquet.compress.CompressorAdapter; import org.apache.parquet.bytes.ByteBufferAllocator; import org.apache.parquet.hadoop.metadata.BlockMetaData; @@ -13,14 +12,12 @@ import org.apache.parquet.schema.MessageType; import org.apache.parquet.schema.Type; -import java.io.IOException; -import java.nio.channels.SeekableByteChannel; import java.util.ArrayList; import java.util.Arrays; import java.util.List; public class RowGroupWriterImpl implements RowGroupWriter { - private final SeekableByteChannel writeChannel; + private final PositionedBufferedOutputStream bufferedOutput; private final MessageType type; private final int targetPageSize; private final ByteBufferAllocator allocator; @@ -29,40 +26,22 @@ public class RowGroupWriterImpl implements RowGroupWriter { private final List currentOffsetIndexes = new ArrayList<>(); private final CompressorAdapter compressorAdapter; - RowGroupWriterImpl(String path, - boolean append, - SeekableChannelsProvider channelsProvider, - MessageType type, - int targetPageSize, - ByteBufferAllocator allocator, - CompressorAdapter compressorAdapter) - throws IOException { - this(channelsProvider.getWriteChannel(path, append), type, targetPageSize, allocator, blockWithPath(path), - compressorAdapter); - } - - private static BlockMetaData blockWithPath(String path) { - BlockMetaData blockMetaData = new BlockMetaData(); - blockMetaData.setPath(path); - return blockMetaData; - } - - RowGroupWriterImpl(SeekableByteChannel writeChannel, + RowGroupWriterImpl(PositionedBufferedOutputStream bufferedOutput, MessageType type, int targetPageSize, ByteBufferAllocator allocator, CompressorAdapter compressorAdapter) { - this(writeChannel, type, targetPageSize, allocator, new BlockMetaData(), compressorAdapter); + this(bufferedOutput, type, targetPageSize, allocator, new BlockMetaData(), compressorAdapter); } - private RowGroupWriterImpl(SeekableByteChannel writeChannel, + private RowGroupWriterImpl(PositionedBufferedOutputStream bufferedOutput, MessageType type, int targetPageSize, ByteBufferAllocator allocator, BlockMetaData blockMetaData, CompressorAdapter compressorAdapter) { - this.writeChannel = writeChannel; + this.bufferedOutput = bufferedOutput; this.type = type; this.targetPageSize = targetPageSize; this.allocator = allocator; @@ -93,7 +72,7 @@ public ColumnWriter addColumn(String columnName) { + " need to close that before opening a writer for " + columnName); } activeWriter = new ColumnWriterImpl(this, - writeChannel, + bufferedOutput, type.getColumnDescription(getPrimitivePath(columnName)), compressorAdapter, targetPageSize,