Skip to content

Commit

Permalink
Added support for partitioned parquet writing
Browse files Browse the repository at this point in the history
  • Loading branch information
malhotrashivam committed Feb 27, 2024
1 parent 7a3652d commit 1f41eef
Show file tree
Hide file tree
Showing 5 changed files with 287 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import io.deephaven.hash.KeyedObjectKey;
import io.deephaven.util.annotations.VisibleForTesting;
import org.apache.parquet.hadoop.metadata.CompressionCodecName;
import org.apache.parquet.schema.MessageType;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;

Expand Down Expand Up @@ -117,8 +118,46 @@ public static int getDefaultTargetPageSize() {

static final String DEFAULT_METADATA_ROOT_DIR = ""; // Empty = No metadata files written

private static final MessageType DEFAULT_COMMON_SCHEMA = null; // No common schema

public ParquetInstructions() {}

/**
* Create a new ParquetInstructions object by copying all fields from the base instructions, and then adding the
* provided common schema. This method is intended for use internally and therefore is package-private.
*
* @param baseInstructions The base instructions to copy from
* @param commonSchema The common schema to add
* @return A new ParquetInstructions object with the common schema added
*/
static ParquetInstructions createWithCommonSchema(@NotNull final ParquetInstructions baseInstructions,
@NotNull final MessageType commonSchema) {
if (!(baseInstructions instanceof ReadOnly)) {
throw new UnsupportedOperationException("Cannot add common schema to non-ReadOnly ParquetInstructions");
}
if (baseInstructions.getCommonSchema() != DEFAULT_COMMON_SCHEMA) {
throw new UnsupportedOperationException("Cannot add common schema to ParquetInstructions with existing " +
"common schema");
}
if (DEFAULT_METADATA_ROOT_DIR.equals(baseInstructions.getMetadataRootDir())) {
throw new UnsupportedOperationException("Cannot add common schema to ParquetInstructions without any" +
" metadata root directory");
}
final ReadOnly readOnly = (ReadOnly) baseInstructions;
return new ReadOnly(
readOnly.copyColumnNameToInstructions(),
readOnly.copyParquetColumnNameToInstructions(),
readOnly.getCompressionCodecName(),
readOnly.getMaximumDictionaryKeys(),
readOnly.getMaximumDictionarySize(),
readOnly.isLegacyParquet(),
readOnly.getTargetPageSize(),
readOnly.isRefreshing(),
readOnly.getSpecialInstructions(),
readOnly.getMetadataRootDir(),
commonSchema);
}

public final String getColumnNameFromParquetColumnNameOrDefault(final String parquetColumnName) {
final String mapped = getColumnNameFromParquetColumnName(parquetColumnName);
return (mapped != null) ? mapped : parquetColumnName;
Expand Down Expand Up @@ -171,6 +210,11 @@ public final String getColumnNameFromParquetColumnNameOrDefault(final String par
*/
public abstract String getMetadataRootDir();

/**
* @return the common schema used for writing _common_metadata file.
*/
abstract MessageType getCommonSchema();

@VisibleForTesting
public static boolean sameColumnNamesAndCodecMappings(final ParquetInstructions i1, final ParquetInstructions i2) {
if (i1 == EMPTY) {
Expand Down Expand Up @@ -250,6 +294,11 @@ public boolean isRefreshing() {
public String getMetadataRootDir() {
return DEFAULT_METADATA_ROOT_DIR;
}

@Override
public MessageType getCommonSchema() {
return DEFAULT_COMMON_SCHEMA;
}
};

private static class ColumnInstructions {
Expand Down Expand Up @@ -319,6 +368,7 @@ private static final class ReadOnly extends ParquetInstructions {
private final boolean isRefreshing;
private final Object specialInstructions;
private final String metadataRootDir;
private final MessageType commonSchema;

private ReadOnly(
final KeyedObjectHashMap<String, ColumnInstructions> columnNameToInstructions,
Expand All @@ -330,7 +380,8 @@ private ReadOnly(
final int targetPageSize,
final boolean isRefreshing,
final Object specialInstructions,
final String metadataRootDir) {
final String metadataRootDir,
final MessageType commonSchema) {
this.columnNameToInstructions = columnNameToInstructions;
this.parquetColumnNameToInstructions = parquetColumnNameToColumnName;
this.compressionCodecName = compressionCodecName;
Expand All @@ -341,6 +392,7 @@ private ReadOnly(
this.isRefreshing = isRefreshing;
this.specialInstructions = specialInstructions;
this.metadataRootDir = metadataRootDir;
this.commonSchema = commonSchema;
}

private String getOrDefault(final String columnName, final String defaultValue,
Expand Down Expand Up @@ -439,6 +491,11 @@ public String getMetadataRootDir() {
return metadataRootDir;
}

@Override
public MessageType getCommonSchema() {
return commonSchema;
}

KeyedObjectHashMap<String, ColumnInstructions> copyColumnNameToInstructions() {
// noinspection unchecked
return (columnNameToInstructions == null)
Expand Down Expand Up @@ -686,7 +743,7 @@ public ParquetInstructions build() {
parquetColumnNameToInstructions = null;
return new ReadOnly(columnNameToInstructionsOut, parquetColumnNameToColumnNameOut, compressionCodecName,
maximumDictionaryKeys, maximumDictionarySize, isLegacyParquet, targetPageSize, isRefreshing,
specialInstructions, metadataRootDir);
specialInstructions, metadataRootDir, DEFAULT_COMMON_SCHEMA);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@
import org.apache.parquet.hadoop.metadata.FileMetaData;
import org.apache.parquet.hadoop.metadata.ParquetMetadata;
import org.apache.parquet.schema.MessageType;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;

import java.io.File;
import java.io.IOException;
Expand Down Expand Up @@ -47,9 +49,26 @@ private static class ParquetFileMetadata {
private final String metadataRootDirAbsPath;
private final List<ParquetFileMetadata> parquetFileMetadataList;
private final SeekableChannelsProvider channelsProvider;
private List<ColumnTypeInfo> columnTypes; // Useful when merging deephaven specific metadata
private final MessageType commonSchema;

ParquetMetadataFileWriterImpl(final String metadataRootDir, final File[] destinations) {
// The following fields are used to accumulate metadata for all parquet files
private MessageType mergedSchema;
private String mergedCreatedByString;
private final Map<String, String> mergedKeyValueMetaData;
private final List<BlockMetaData> mergedBlocks;

/**
* Per-column type information stored in key-value metadata
*/
private List<ColumnTypeInfo> columnTypes;

/**
* @param metadataRootDir The root directory for the metadata files
* @param destinations The indivdual parquet file destinations, all of which must be contained in the metadata root
* @param commonSchema The common schema to be included for writing the _common_metadata file.
*/
ParquetMetadataFileWriterImpl(@NotNull final String metadataRootDir, @NotNull final File[] destinations,
@Nullable final MessageType commonSchema) {
for (final File destination : destinations) {
if (!destination.getAbsolutePath().startsWith(metadataRootDir)) {
throw new UncheckedDeephavenException("All destinations must be contained in the provided metadata root"
Expand All @@ -63,6 +82,12 @@ private static class ParquetFileMetadata {
this.channelsProvider =
SeekableChannelsProviderLoader.getInstance().fromServiceLoader(convertToURI(rootDir.getPath()), null);
this.columnTypes = null;
this.commonSchema = commonSchema;

this.mergedSchema = null;
this.mergedCreatedByString = null;
this.mergedKeyValueMetaData = new HashMap<>();
this.mergedBlocks = new ArrayList<>();
}

/**
Expand All @@ -75,19 +100,39 @@ public void addParquetFileMetadata(final File parquetFile, final ParquetMetadata
parquetFileMetadataList.add(new ParquetFileMetadata(parquetFile, metadata));
}

/**
* Write the accumulated metadata to the provided files and clear the metadata accumulated so far.
*
* @param metadataFile The destination file for the _metadata file
* @param commonMetadataFile The destination file for the _common_metadata file
*/
public void writeMetadataFiles(final File metadataFile, final File commonMetadataFile) throws IOException {
final ParquetMetadata metadataFooter = mergeMetadata();
if (parquetFileMetadataList.isEmpty()) {
throw new UncheckedDeephavenException("No parquet files to write metadata for");
}
mergeMetadata();
final ParquetMetadata metadataFooter =
new ParquetMetadata(new FileMetaData(mergedSchema, mergedKeyValueMetaData, mergedCreatedByString),
mergedBlocks);
writeMetadataFile(metadataFooter, metadataFile.getAbsolutePath());

metadataFooter.getBlocks().clear();
writeMetadataFile(metadataFooter, commonMetadataFile.toString());
parquetFileMetadataList.clear();
// Skip the blocks data and merge schema with the common schema to write the common metadata file
// The ordering of arguments in method call is important because we want common schema to determine the overall
// ordering of the schema fields.
mergedSchema = mergeSchemaInto(mergedSchema, commonSchema);
final ParquetMetadata commonMetadataFooter =
new ParquetMetadata(new FileMetaData(mergedSchema, mergedKeyValueMetaData, mergedCreatedByString),
new ArrayList<>());
writeMetadataFile(commonMetadataFooter, commonMetadataFile.toString());

// Clear the accumulated metadata
clear();
}

private ParquetMetadata mergeMetadata() throws IOException {
MessageType mergedSchema = null;
final Map<String, String> mergedKeyValueMetaData = new HashMap<>();
final List<BlockMetaData> mergedBlocks = new ArrayList<>();
/**
* Merge all the accumulated metadata for the parquet files.
*/
private void mergeMetadata() throws IOException {
final Collection<String> mergedCreatedBy = new HashSet<>();
for (final ParquetFileMetadata parquetFileMetadata : parquetFileMetadataList) {
final FileMetaData fileMetaData = parquetFileMetadata.metadata.getFileMetaData();
Expand All @@ -96,12 +141,14 @@ private ParquetMetadata mergeMetadata() throws IOException {
mergeBlocksInto(parquetFileMetadata, metadataRootDirAbsPath, mergedBlocks);
mergedCreatedBy.add(fileMetaData.getCreatedBy());
}
final String createdByString =
mergedCreatedByString =
mergedCreatedBy.size() == 1 ? mergedCreatedBy.iterator().next() : mergedCreatedBy.toString();
return new ParquetMetadata(new FileMetaData(mergedSchema, mergedKeyValueMetaData, createdByString),
mergedBlocks);
}

/**
* Merge the provided schema into the merged schema. Note that if there are common fields between the two schemas,
* the output schema will have the fields in the order they appear in the merged schema.
*/
private static MessageType mergeSchemaInto(final MessageType schema, final MessageType mergedSchema) {
if (mergedSchema == null) {
return schema;
Expand All @@ -126,7 +173,7 @@ private void mergeKeyValueMetaDataInto(final Map<String, String> keyValueMetaDat
}
} else {
// For merging deephaven-specific metadata,
// - groupingColumns, dataIndexes should always be dropped
// - groupingColumns, dataIndexes are skipped
// - version is optional, so we read it from the first file's metadata
// - columnTypes must be the same for all partitions
final TableInfo tableInfo = TableInfo.deserializeFromJSON(entry.getValue());
Expand All @@ -135,9 +182,11 @@ private void mergeKeyValueMetaDataInto(final Map<String, String> keyValueMetaDat
Assert.eqNull(columnTypes, "columnTypes");
columnTypes = tableInfo.columnTypes();
mergedKeyValueMetaData.put(ParquetTableWriter.METADATA_KEY,
TableInfo.builder().addAllColumnTypes(columnTypes)
TableInfo.builder()
.addAllColumnTypes(columnTypes)
.version(tableInfo.version())
.build().serializeToJSON());
.build()
.serializeToJSON());
} else if (!columnTypes.equals(tableInfo.columnTypes())) {
throw new UncheckedDeephavenException("Could not merge metadata for key " +
ParquetTableWriter.METADATA_KEY + ", has conflicting values for columnTypes: " +
Expand Down Expand Up @@ -174,6 +223,10 @@ private void writeMetadataFile(final ParquetMetadata metadataFooter, final Strin

public void clear() {
parquetFileMetadataList.clear();
mergedKeyValueMetaData.clear();
mergedBlocks.clear();
columnTypes = null;
mergedSchema = null;
mergedCreatedByString = null;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import org.apache.commons.lang3.tuple.Pair;
import org.apache.parquet.bytes.HeapByteBufferAllocator;
import org.apache.parquet.column.statistics.Statistics;
import org.apache.parquet.schema.MessageType;
import org.apache.parquet.schema.PrimitiveType;
import org.apache.parquet.schema.Types;
import org.jetbrains.annotations.NotNull;
Expand Down Expand Up @@ -229,6 +230,22 @@ private static void write(
parquetFileWriter.close();
}

/**
* Get the parquet schema for a table
*
* @param table the input table
* @param definition the table definition
* @param instructions write instructions for the file
* @return the parquet schema
*/
static MessageType getSchemaForTable(@NotNull final Table table,
@NotNull final TableDefinition definition,
@NotNull final ParquetInstructions instructions) {
final Table pretransformTable = pretransformTable(table, definition);
return MappedSchema.create(new HashMap<>(), definition, pretransformTable.getRowSet(),
pretransformTable.getColumnSourceMap(), instructions).getParquetSchema();
}

/**
* Detect any missing or StringSet columns and convert them to arrays / null values as appropriate to prepare the
* input table to be written to the parquet file.
Expand Down
Loading

0 comments on commit 1f41eef

Please sign in to comment.