Skip to content

Commit

Permalink
Added buffered stream for all parquet writes (#4669)
Browse files Browse the repository at this point in the history
  • Loading branch information
malhotrashivam committed Oct 23, 2023
1 parent 43c10f4 commit 02c2633
Show file tree
Hide file tree
Showing 4 changed files with 102 additions and 130 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,34 +23,24 @@
import org.apache.parquet.schema.PrimitiveType;
import org.jetbrains.annotations.NotNull;

import java.io.BufferedOutputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.nio.ByteBuffer;
import java.nio.IntBuffer;
import java.nio.channels.Channels;
import java.nio.channels.SeekableByteChannel;
import java.nio.channels.WritableByteChannel;
import java.util.HashSet;
import java.util.Set;

import static org.apache.parquet.bytes.BytesUtils.getWidthFromMaxInt;
import static org.apache.parquet.format.Util.writePageHeader;

public class ColumnWriterImpl implements ColumnWriter {
public final class ColumnWriterImpl implements ColumnWriter {

private static final int MIN_SLAB_SIZE = 64;
private final SeekableByteChannel writeChannel;
/**
* Buffered stream that writes to {@link #writeChannel}. Used for buffering small writes, particularly page headers.
*/
private final BufferedOutputStream bufferedOutput;
/**
* Following is used to set the size of buffer for {@link #bufferedOutput}. In our testing, we found the page
* headers to be much smaller than 16 KB, so using that as the default size.
*/
private static final int EXPECTED_PAGE_HEADER_SIZE = 16 << 10;

private final PositionedBufferedOutputStream bufferedOutput;
private final ColumnDescriptor column;
private final RowGroupWriterImpl owner;
private final CompressorAdapter compressorAdapter;
Expand Down Expand Up @@ -78,13 +68,12 @@ public class ColumnWriterImpl implements ColumnWriter {

ColumnWriterImpl(
final RowGroupWriterImpl owner,
final SeekableByteChannel writeChannel,
final PositionedBufferedOutputStream bufferedOutput,
final ColumnDescriptor column,
final CompressorAdapter compressorAdapter,
final int targetPageSize,
final ByteBufferAllocator allocator) {
this.writeChannel = writeChannel;
bufferedOutput = new BufferedOutputStream(Channels.newOutputStream(writeChannel), EXPECTED_PAGE_HEADER_SIZE);
this.bufferedOutput = bufferedOutput;
this.column = column;
this.compressorAdapter = compressorAdapter;
this.targetPageSize = targetPageSize;
Expand Down Expand Up @@ -143,37 +132,36 @@ public void addDictionaryPage(@NotNull final Object dictionaryValues, final int

// noinspection unchecked
dictionaryWriter.writeBulk(dictionaryValues, valuesCount, NullStatistics.INSTANCE);
dictionaryOffset = writeChannel.position();
dictionaryOffset = bufferedOutput.position();
writeDictionaryPage(dictionaryWriter.getByteBufferView(), valuesCount);
pageCount++;
hasDictionary = true;
dictionaryPage = new DictionaryPageHeader(valuesCount, org.apache.parquet.format.Encoding.PLAIN);
}

public void writeDictionaryPage(final ByteBuffer dictionaryBuffer, final int valuesCount) throws IOException {
long currentChunkDictionaryPageOffset = writeChannel.position();
int uncompressedSize = dictionaryBuffer.remaining();
private void writeDictionaryPage(final ByteBuffer dictionaryBuffer, final int valuesCount) throws IOException {
final long currentChunkDictionaryPageOffset = bufferedOutput.position();
final int uncompressedSize = dictionaryBuffer.remaining();

compressorAdapter.reset();
ByteArrayOutputStream baos = new ByteArrayOutputStream();
try (WritableByteChannel channel = Channels.newChannel(compressorAdapter.compress(baos))) {
final ByteArrayOutputStream baos = new ByteArrayOutputStream();
try (final WritableByteChannel channel = Channels.newChannel(compressorAdapter.compress(baos))) {
channel.write(dictionaryBuffer);
}
BytesInput compressedBytes = BytesInput.from(baos);
final BytesInput compressedBytes = BytesInput.from(baos);

int compressedPageSize = (int) compressedBytes.size();
final int compressedPageSize = (int) compressedBytes.size();

metadataConverter.writeDictionaryPageHeader(
uncompressedSize,
compressedPageSize,
valuesCount,
Encoding.PLAIN,
bufferedOutput);
bufferedOutput.flush();
long headerSize = writeChannel.position() - currentChunkDictionaryPageOffset;
final long headerSize = bufferedOutput.position() - currentChunkDictionaryPageOffset;
this.uncompressedLength += uncompressedSize + headerSize;
this.compressedLength += compressedPageSize + headerSize;
writeChannel.write(compressedBytes.toByteBuffer());
compressedBytes.writeAllTo(bufferedOutput);
encodings.add(Encoding.PLAIN);
}

Expand All @@ -183,7 +171,7 @@ private BulkWriter getWriter(final PrimitiveType primitiveType) {
case FIXED_LEN_BYTE_ARRAY:
throw new UnsupportedOperationException("No support for writing FIXED_LENGTH or INT96 types");
case INT32:
LogicalTypeAnnotation annotation = primitiveType.getLogicalTypeAnnotation();
final LogicalTypeAnnotation annotation = primitiveType.getLogicalTypeAnnotation();
if (annotation != null) {
// Appropriately set the null value for different type of integers
if (LogicalTypeAnnotation.intType(8, true).equals(annotation)) {
Expand Down Expand Up @@ -239,7 +227,7 @@ public void addVectorPage(
}
initWriter();
// noinspection unchecked
int valueCount =
final int valueCount =
bulkWriter.writeBulkVector(pageData, repeatCount, rlEncoder, dlEncoder, nonNullValueCount, statistics);
writePage(bulkWriter.getByteBufferView(), valueCount);
bulkWriter.reset();
Expand Down Expand Up @@ -271,12 +259,12 @@ private PageHeader newDataPageV2Header(
final int rlByteLength,
final int dlByteLength) {
// TODO: pageHeader.crc = ...;
DataPageHeaderV2 dataPageHeaderV2 = new DataPageHeaderV2(
final DataPageHeaderV2 dataPageHeaderV2 = new DataPageHeaderV2(
valueCount, nullCount, rowCount,
hasDictionary ? org.apache.parquet.format.Encoding.PLAIN_DICTIONARY
: org.apache.parquet.format.Encoding.PLAIN,
dlByteLength, rlByteLength);
PageHeader pageHeader = new PageHeader(PageType.DATA_PAGE_V2, uncompressedSize, compressedSize);
final PageHeader pageHeader = new PageHeader(PageType.DATA_PAGE_V2, uncompressedSize, compressedSize);
pageHeader.setData_page_header_v2(dataPageHeaderV2);
if (hasDictionary) {
pageHeader.setDictionary_page_header(dictionaryPage);
Expand All @@ -293,19 +281,19 @@ public void writePageV2(
final BytesInput repetitionLevels,
final BytesInput definitionLevels,
final ByteBuffer data) throws IOException {
int rlByteLength = (int) repetitionLevels.size();
int dlByteLength = (int) definitionLevels.size();
int uncompressedDataSize = data.remaining();
int uncompressedSize = (int) (uncompressedDataSize + repetitionLevels.size() + definitionLevels.size());
final int rlByteLength = (int) repetitionLevels.size();
final int dlByteLength = (int) definitionLevels.size();
final int uncompressedDataSize = data.remaining();
final int uncompressedSize = (int) (uncompressedDataSize + repetitionLevels.size() + definitionLevels.size());

ByteArrayOutputStream baos = new ByteArrayOutputStream();
try (WritableByteChannel channel = Channels.newChannel(compressorAdapter.compress(baos))) {
final ByteArrayOutputStream baos = new ByteArrayOutputStream();
try (final WritableByteChannel channel = Channels.newChannel(compressorAdapter.compress(baos))) {
channel.write(data);
}
BytesInput compressedData = BytesInput.from(baos);
int compressedSize = (int) (compressedData.size() + repetitionLevels.size() + definitionLevels.size());
final BytesInput compressedData = BytesInput.from(baos);
final int compressedSize = (int) (compressedData.size() + repetitionLevels.size() + definitionLevels.size());

long initialOffset = writeChannel.position();
final long initialOffset = bufferedOutput.position();
if (firstDataPageOffset == -1) {
firstDataPageOffset = initialOffset;
}
Expand All @@ -315,25 +303,24 @@ public void writePageV2(
rlByteLength,
dlByteLength,
bufferedOutput);
bufferedOutput.flush();
long headerSize = writeChannel.position() - initialOffset;
final long headerSize = bufferedOutput.position() - initialOffset;
this.uncompressedLength += (uncompressedSize + headerSize);
this.compressedLength += (compressedSize + headerSize);
this.totalValueCount += valueCount;
this.pageCount += 1;

writeChannel.write(definitionLevels.toByteBuffer());
writeChannel.write(compressedData.toByteBuffer());
definitionLevels.writeAllTo(bufferedOutput);
compressedData.writeAllTo(bufferedOutput);
}

private void writePage(final BytesInput bytes, final int valueCount, final Encoding valuesEncoding)
throws IOException {
long initialOffset = writeChannel.position();
final long initialOffset = bufferedOutput.position();
if (firstDataPageOffset == -1) {
firstDataPageOffset = initialOffset;
}

long uncompressedSize = bytes.size();
final long uncompressedSize = bytes.size();
if (uncompressedSize > Integer.MAX_VALUE) {
throw new ParquetEncodingException(
"Cannot write page larger than Integer.MAX_VALUE bytes: " +
Expand All @@ -342,13 +329,13 @@ private void writePage(final BytesInput bytes, final int valueCount, final Encod

compressorAdapter.reset();

ByteArrayOutputStream baos = new ByteArrayOutputStream();
try (OutputStream cos = compressorAdapter.compress(baos)) {
final ByteArrayOutputStream baos = new ByteArrayOutputStream();
try (final OutputStream cos = compressorAdapter.compress(baos)) {
bytes.writeAllTo(cos);
}
BytesInput compressedBytes = BytesInput.from(baos);
final BytesInput compressedBytes = BytesInput.from(baos);

long compressedSize = compressedBytes.size();
final long compressedSize = compressedBytes.size();
if (compressedSize > Integer.MAX_VALUE) {
throw new ParquetEncodingException(
"Cannot write compressed page larger than Integer.MAX_VALUE bytes: "
Expand All @@ -360,15 +347,14 @@ private void writePage(final BytesInput bytes, final int valueCount, final Encod
valueCount,
valuesEncoding,
bufferedOutput);
bufferedOutput.flush();
long headerSize = writeChannel.position() - initialOffset;
final long headerSize = bufferedOutput.position() - initialOffset;
this.uncompressedLength += (uncompressedSize + headerSize);
this.compressedLength += (compressedSize + headerSize);
this.totalValueCount += valueCount;
this.pageCount += 1;

writeChannel.write(compressedBytes.toByteBuffer());
offsetIndexBuilder.add((int) (writeChannel.position() - initialOffset), valueCount);
compressedBytes.writeAllTo(bufferedOutput);
offsetIndexBuilder.add((int) (bufferedOutput.position() - initialOffset), valueCount);
encodings.add(valuesEncoding);
encodingStatsBuilder.addDataEncoding(valuesEncoding);
}
Expand All @@ -385,12 +371,12 @@ private void writeDataPageV1Header(
valuesEncoding), to);
}

private PageHeader newDataPageHeader(
private static PageHeader newDataPageHeader(
final int uncompressedSize,
final int compressedSize,
final int valueCount,
final Encoding valuesEncoding) {
PageHeader pageHeader = new PageHeader(PageType.DATA_PAGE, uncompressedSize, compressedSize);
final PageHeader pageHeader = new PageHeader(PageType.DATA_PAGE, uncompressedSize, compressedSize);

pageHeader.setData_page_header(new DataPageHeader(
valueCount,
Expand All @@ -409,11 +395,11 @@ private void writePage(final ByteBuffer encodedData, final long valueCount) {
try {
BytesInput bytes = BytesInput.from(encodedData);
if (dlEncoder != null) {
BytesInput dlBytesInput = dlEncoder.toBytes();
final BytesInput dlBytesInput = dlEncoder.toBytes();
bytes = BytesInput.concat(BytesInput.fromInt((int) dlBytesInput.size()), dlBytesInput, bytes);
}
if (rlEncoder != null) {
BytesInput rlBytesInput = rlEncoder.toBytes();
final BytesInput rlBytesInput = rlEncoder.toBytes();
bytes = BytesInput.concat(BytesInput.fromInt((int) rlBytesInput.size()), rlBytesInput, bytes);
}
writePage(
Expand Down Expand Up @@ -453,7 +439,7 @@ public ColumnDescriptor getColumn() {
return column;
}

public OffsetIndex getOffsetIndex() {
OffsetIndex getOffsetIndex() {
return offsetIndexBuilder.build(firstDataPageOffset);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,26 +18,22 @@
import org.apache.parquet.schema.MessageType;

import java.io.IOException;
import java.io.OutputStream;
import java.nio.ByteBuffer;
import java.nio.channels.Channels;
import java.nio.channels.SeekableByteChannel;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import static org.apache.parquet.format.Util.writeFileMetaData;

public class ParquetFileWriter {
public final class ParquetFileWriter {
private static final ParquetMetadataConverter metadataConverter = new ParquetMetadataConverter();
private static final int VERSION = 1;
private static final int OUTPUT_BUFFER_SIZE = 1 << 18;

private final SeekableByteChannel writeChannel;
private final PositionedBufferedOutputStream bufferedOutput;
private final MessageType type;
private final int targetPageSize;
private final ByteBufferAllocator allocator;
private final SeekableChannelsProvider channelsProvider;
private final CompressorAdapter compressorAdapter;
private final Map<String, String> extraMetaData;
private final List<BlockMetaData> blocks = new ArrayList<>();
Expand All @@ -54,67 +50,54 @@ public ParquetFileWriter(
this.targetPageSize = targetPageSize;
this.allocator = allocator;
this.extraMetaData = new HashMap<>(extraMetaData);
writeChannel = channelsProvider.getWriteChannel(filePath, false); // TODO add support for appending
writeChannel.write(ByteBuffer.wrap(ParquetFileReader.MAGIC));
bufferedOutput = new PositionedBufferedOutputStream(channelsProvider.getWriteChannel(filePath, false),
OUTPUT_BUFFER_SIZE);
bufferedOutput.write(ParquetFileReader.MAGIC);
this.type = type;
this.channelsProvider = channelsProvider;
this.compressorAdapter = DeephavenCompressorAdapterFactory.getInstance().getByName(codecName);
}

@SuppressWarnings("unused")
RowGroupWriter addRowGroup(final String path, final boolean append) throws IOException {
RowGroupWriterImpl rowGroupWriter =
new RowGroupWriterImpl(path, append, channelsProvider, type, targetPageSize, allocator,
compressorAdapter);
blocks.add(rowGroupWriter.getBlock());
return rowGroupWriter;
}

public RowGroupWriter addRowGroup(final long size) {
RowGroupWriterImpl rowGroupWriter =
new RowGroupWriterImpl(writeChannel, type, targetPageSize, allocator, compressorAdapter);
final RowGroupWriterImpl rowGroupWriter =
new RowGroupWriterImpl(bufferedOutput, type, targetPageSize, allocator, compressorAdapter);
rowGroupWriter.getBlock().setRowCount(size);
blocks.add(rowGroupWriter.getBlock());
offsetIndexes.add(rowGroupWriter.offsetIndexes());
return rowGroupWriter;
}

public void close() throws IOException {
try (final OutputStream os = Channels.newOutputStream(writeChannel)) {
serializeOffsetIndexes(offsetIndexes, blocks, os);
ParquetMetadata footer =
new ParquetMetadata(new FileMetaData(type, extraMetaData, Version.FULL_VERSION), blocks);
serializeFooter(footer, os);
}
// os (and thus writeChannel) are closed at this point.

serializeOffsetIndexes();
final ParquetMetadata footer =
new ParquetMetadata(new FileMetaData(type, extraMetaData, Version.FULL_VERSION), blocks);
serializeFooter(footer);
// Flush any buffered data and close the channel
bufferedOutput.close();
compressorAdapter.close();
}

private void serializeFooter(final ParquetMetadata footer, final OutputStream os) throws IOException {
final long footerIndex = writeChannel.position();
org.apache.parquet.format.FileMetaData parquetMetadata = metadataConverter.toParquetMetadata(VERSION, footer);
writeFileMetaData(parquetMetadata, os);
BytesUtils.writeIntLittleEndian(os, (int) (writeChannel.position() - footerIndex));
os.write(ParquetFileReader.MAGIC);
private void serializeFooter(final ParquetMetadata footer) throws IOException {
final long footerIndex = bufferedOutput.position();
final org.apache.parquet.format.FileMetaData parquetMetadata =
metadataConverter.toParquetMetadata(VERSION, footer);
writeFileMetaData(parquetMetadata, bufferedOutput);
BytesUtils.writeIntLittleEndian(bufferedOutput, (int) (bufferedOutput.position() - footerIndex));
bufferedOutput.write(ParquetFileReader.MAGIC);
}

private void serializeOffsetIndexes(
final List<List<OffsetIndex>> offsetIndexes,
final List<BlockMetaData> blocks,
final OutputStream os) throws IOException {
private void serializeOffsetIndexes() throws IOException {
for (int bIndex = 0, bSize = blocks.size(); bIndex < bSize; ++bIndex) {
final List<ColumnChunkMetaData> columns = blocks.get(bIndex).getColumns();
final List<OffsetIndex> blockOffsetIndexes = offsetIndexes.get(bIndex);
for (int cIndex = 0, cSize = columns.size(); cIndex < cSize; ++cIndex) {
OffsetIndex offsetIndex = blockOffsetIndexes.get(cIndex);
final OffsetIndex offsetIndex = blockOffsetIndexes.get(cIndex);
if (offsetIndex == null) {
continue;
}
ColumnChunkMetaData column = columns.get(cIndex);
final long offset = writeChannel.position();
Util.writeOffsetIndex(ParquetMetadataConverter.toParquetOffsetIndex(offsetIndex), os);
column.setOffsetIndexReference(new IndexReference(offset, (int) (writeChannel.position() - offset)));
final ColumnChunkMetaData column = columns.get(cIndex);
final long offset = bufferedOutput.position();
Util.writeOffsetIndex(ParquetMetadataConverter.toParquetOffsetIndex(offsetIndex), bufferedOutput);
column.setOffsetIndexReference(new IndexReference(offset, (int) (bufferedOutput.position() - offset)));
}
}
}
Expand Down
Loading

0 comments on commit 02c2633

Please sign in to comment.