From 0a066d8a5c71386e56dee7bd7a21170b27e4283a Mon Sep 17 00:00:00 2001 From: Xianyang Liu Date: Wed, 8 Nov 2023 23:45:06 +0800 Subject: [PATCH] PARQUET-2372: Avoid unnecessary reading of RowGroup data during rewriting (#1183) --- .../hadoop/rewrite/ParquetRewriter.java | 22 +++++++------------ 1 file changed, 8 insertions(+), 14 deletions(-) diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/rewrite/ParquetRewriter.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/rewrite/ParquetRewriter.java index bf2155e28c..004a1d135e 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/rewrite/ParquetRewriter.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/rewrite/ParquetRewriter.java @@ -265,14 +265,9 @@ public void processBlocks() throws IOException { } private void processBlocksFromReader(IndexCache indexCache) throws IOException { - PageReadStore store = reader.readNextRowGroup(); - ColumnReadStoreImpl crStore = new ColumnReadStoreImpl(store, new DummyGroupConverter(), schema, originalCreatedBy); - - int blockId = 0; - while (store != null) { - writer.startBlock(store.getRowCount()); - + for (int blockId = 0; blockId < meta.getBlocks().size(); blockId ++) { BlockMetaData blockMetaData = meta.getBlocks().get(blockId); + writer.startBlock(blockMetaData.getRowCount()); indexCache.setBlockMetadata(blockMetaData); List columnsInOrder = blockMetaData.getColumns(); for (int i = 0, columnId = 0; i < columnsInOrder.size(); i++) { @@ -304,9 +299,9 @@ private void processBlocksFromReader(IndexCache indexCache) throws IOException { "Required column [" + descriptor.getPrimitiveType().getName() + "] cannot be nullified"); } nullifyColumn( + blockId, descriptor, chunk, - crStore, writer, schema, newCodecName, @@ -323,7 +318,7 @@ private void processBlocksFromReader(IndexCache indexCache) throws IOException { } // Translate compression and/or encryption - writer.startColumn(descriptor, crStore.getColumnReader(descriptor).getTotalValueCount(), newCodecName); + writer.startColumn(descriptor, chunk.getValueCount(), newCodecName); processChunk( chunk, newCodecName, @@ -345,9 +340,6 @@ private void processBlocksFromReader(IndexCache indexCache) throws IOException { } writer.endBlock(); - store = reader.readNextRowGroup(); - crStore = new ColumnReadStoreImpl(store, new DummyGroupConverter(), schema, originalCreatedBy); - blockId++; numBlocksRewritten++; } } @@ -675,9 +667,9 @@ private Set convertToColumnPaths(List cols) { return prunePaths; } - private void nullifyColumn(ColumnDescriptor descriptor, + private void nullifyColumn(int blockIndex, + ColumnDescriptor descriptor, ColumnChunkMetaData chunk, - ColumnReadStoreImpl crStore, ParquetFileWriter writer, MessageType schema, CompressionCodecName newCodecName, @@ -688,6 +680,8 @@ private void nullifyColumn(ColumnDescriptor descriptor, long totalChunkValues = chunk.getValueCount(); int dMax = descriptor.getMaxDefinitionLevel(); + PageReadStore pageReadStore = reader.readRowGroup(blockIndex); + ColumnReadStoreImpl crStore = new ColumnReadStoreImpl(pageReadStore, new DummyGroupConverter(), schema, originalCreatedBy); ColumnReader cReader = crStore.getColumnReader(descriptor); ParquetProperties.WriterVersion writerVersion = chunk.getEncodingStats().usesV2Pages() ?