Skip to content

Commit

Permalink
PARQUET-2372: Avoid unnecessary reading of RowGroup data during rewri…
Browse files Browse the repository at this point in the history
…ting (#1183)
  • Loading branch information
ConeyLiu authored Nov 8, 2023
1 parent 56f40e4 commit 0a066d8
Showing 1 changed file with 8 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -265,14 +265,9 @@ public void processBlocks() throws IOException {
}

private void processBlocksFromReader(IndexCache indexCache) throws IOException {
PageReadStore store = reader.readNextRowGroup();
ColumnReadStoreImpl crStore = new ColumnReadStoreImpl(store, new DummyGroupConverter(), schema, originalCreatedBy);

int blockId = 0;
while (store != null) {
writer.startBlock(store.getRowCount());

for (int blockId = 0; blockId < meta.getBlocks().size(); blockId ++) {
BlockMetaData blockMetaData = meta.getBlocks().get(blockId);
writer.startBlock(blockMetaData.getRowCount());
indexCache.setBlockMetadata(blockMetaData);
List<ColumnChunkMetaData> columnsInOrder = blockMetaData.getColumns();
for (int i = 0, columnId = 0; i < columnsInOrder.size(); i++) {
Expand Down Expand Up @@ -304,9 +299,9 @@ private void processBlocksFromReader(IndexCache indexCache) throws IOException {
"Required column [" + descriptor.getPrimitiveType().getName() + "] cannot be nullified");
}
nullifyColumn(
blockId,
descriptor,
chunk,
crStore,
writer,
schema,
newCodecName,
Expand All @@ -323,7 +318,7 @@ private void processBlocksFromReader(IndexCache indexCache) throws IOException {
}

// Translate compression and/or encryption
writer.startColumn(descriptor, crStore.getColumnReader(descriptor).getTotalValueCount(), newCodecName);
writer.startColumn(descriptor, chunk.getValueCount(), newCodecName);
processChunk(
chunk,
newCodecName,
Expand All @@ -345,9 +340,6 @@ private void processBlocksFromReader(IndexCache indexCache) throws IOException {
}

writer.endBlock();
store = reader.readNextRowGroup();
crStore = new ColumnReadStoreImpl(store, new DummyGroupConverter(), schema, originalCreatedBy);
blockId++;
numBlocksRewritten++;
}
}
Expand Down Expand Up @@ -675,9 +667,9 @@ private Set<ColumnPath> convertToColumnPaths(List<String> cols) {
return prunePaths;
}

private void nullifyColumn(ColumnDescriptor descriptor,
private void nullifyColumn(int blockIndex,
ColumnDescriptor descriptor,
ColumnChunkMetaData chunk,
ColumnReadStoreImpl crStore,
ParquetFileWriter writer,
MessageType schema,
CompressionCodecName newCodecName,
Expand All @@ -688,6 +680,8 @@ private void nullifyColumn(ColumnDescriptor descriptor,

long totalChunkValues = chunk.getValueCount();
int dMax = descriptor.getMaxDefinitionLevel();
PageReadStore pageReadStore = reader.readRowGroup(blockIndex);
ColumnReadStoreImpl crStore = new ColumnReadStoreImpl(pageReadStore, new DummyGroupConverter(), schema, originalCreatedBy);
ColumnReader cReader = crStore.getColumnReader(descriptor);

ParquetProperties.WriterVersion writerVersion = chunk.getEncodingStats().usesV2Pages() ?
Expand Down

0 comments on commit 0a066d8

Please sign in to comment.