From 1f41eefa83f597450c1383d6858a670d73605360 Mon Sep 17 00:00:00 2001 From: Shivam Malhotra Date: Tue, 27 Feb 2024 13:56:35 -0600 Subject: [PATCH] Added support for partitioned parquet writing --- .../parquet/table/ParquetInstructions.java | 61 ++++++++++++- .../table/ParquetMetadataFileWriterImpl.java | 85 +++++++++++++++---- .../parquet/table/ParquetTableWriter.java | 17 ++++ .../deephaven/parquet/table/ParquetTools.java | 81 +++++++++++++++++- .../table/ParquetTableReadWriteTest.java | 67 ++++++++++++++- 5 files changed, 287 insertions(+), 24 deletions(-) diff --git a/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/ParquetInstructions.java b/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/ParquetInstructions.java index 2cc99bad99d..ffe2e45b888 100644 --- a/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/ParquetInstructions.java +++ b/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/ParquetInstructions.java @@ -10,6 +10,7 @@ import io.deephaven.hash.KeyedObjectKey; import io.deephaven.util.annotations.VisibleForTesting; import org.apache.parquet.hadoop.metadata.CompressionCodecName; +import org.apache.parquet.schema.MessageType; import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; @@ -117,8 +118,46 @@ public static int getDefaultTargetPageSize() { static final String DEFAULT_METADATA_ROOT_DIR = ""; // Empty = No metadata files written + private static final MessageType DEFAULT_COMMON_SCHEMA = null; // No common schema + public ParquetInstructions() {} + /** + * Create a new ParquetInstructions object by copying all fields from the base instructions, and then adding the + * provided common schema. This method is intended for use internally and therefore is package-private. + * + * @param baseInstructions The base instructions to copy from + * @param commonSchema The common schema to add + * @return A new ParquetInstructions object with the common schema added + */ + static ParquetInstructions createWithCommonSchema(@NotNull final ParquetInstructions baseInstructions, + @NotNull final MessageType commonSchema) { + if (!(baseInstructions instanceof ReadOnly)) { + throw new UnsupportedOperationException("Cannot add common schema to non-ReadOnly ParquetInstructions"); + } + if (baseInstructions.getCommonSchema() != DEFAULT_COMMON_SCHEMA) { + throw new UnsupportedOperationException("Cannot add common schema to ParquetInstructions with existing " + + "common schema"); + } + if (DEFAULT_METADATA_ROOT_DIR.equals(baseInstructions.getMetadataRootDir())) { + throw new UnsupportedOperationException("Cannot add common schema to ParquetInstructions without any" + + " metadata root directory"); + } + final ReadOnly readOnly = (ReadOnly) baseInstructions; + return new ReadOnly( + readOnly.copyColumnNameToInstructions(), + readOnly.copyParquetColumnNameToInstructions(), + readOnly.getCompressionCodecName(), + readOnly.getMaximumDictionaryKeys(), + readOnly.getMaximumDictionarySize(), + readOnly.isLegacyParquet(), + readOnly.getTargetPageSize(), + readOnly.isRefreshing(), + readOnly.getSpecialInstructions(), + readOnly.getMetadataRootDir(), + commonSchema); + } + public final String getColumnNameFromParquetColumnNameOrDefault(final String parquetColumnName) { final String mapped = getColumnNameFromParquetColumnName(parquetColumnName); return (mapped != null) ? mapped : parquetColumnName; @@ -171,6 +210,11 @@ public final String getColumnNameFromParquetColumnNameOrDefault(final String par */ public abstract String getMetadataRootDir(); + /** + * @return the common schema used for writing _common_metadata file. + */ + abstract MessageType getCommonSchema(); + @VisibleForTesting public static boolean sameColumnNamesAndCodecMappings(final ParquetInstructions i1, final ParquetInstructions i2) { if (i1 == EMPTY) { @@ -250,6 +294,11 @@ public boolean isRefreshing() { public String getMetadataRootDir() { return DEFAULT_METADATA_ROOT_DIR; } + + @Override + public MessageType getCommonSchema() { + return DEFAULT_COMMON_SCHEMA; + } }; private static class ColumnInstructions { @@ -319,6 +368,7 @@ private static final class ReadOnly extends ParquetInstructions { private final boolean isRefreshing; private final Object specialInstructions; private final String metadataRootDir; + private final MessageType commonSchema; private ReadOnly( final KeyedObjectHashMap columnNameToInstructions, @@ -330,7 +380,8 @@ private ReadOnly( final int targetPageSize, final boolean isRefreshing, final Object specialInstructions, - final String metadataRootDir) { + final String metadataRootDir, + final MessageType commonSchema) { this.columnNameToInstructions = columnNameToInstructions; this.parquetColumnNameToInstructions = parquetColumnNameToColumnName; this.compressionCodecName = compressionCodecName; @@ -341,6 +392,7 @@ private ReadOnly( this.isRefreshing = isRefreshing; this.specialInstructions = specialInstructions; this.metadataRootDir = metadataRootDir; + this.commonSchema = commonSchema; } private String getOrDefault(final String columnName, final String defaultValue, @@ -439,6 +491,11 @@ public String getMetadataRootDir() { return metadataRootDir; } + @Override + public MessageType getCommonSchema() { + return commonSchema; + } + KeyedObjectHashMap copyColumnNameToInstructions() { // noinspection unchecked return (columnNameToInstructions == null) @@ -686,7 +743,7 @@ public ParquetInstructions build() { parquetColumnNameToInstructions = null; return new ReadOnly(columnNameToInstructionsOut, parquetColumnNameToColumnNameOut, compressionCodecName, maximumDictionaryKeys, maximumDictionarySize, isLegacyParquet, targetPageSize, isRefreshing, - specialInstructions, metadataRootDir); + specialInstructions, metadataRootDir, DEFAULT_COMMON_SCHEMA); } } diff --git a/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/ParquetMetadataFileWriterImpl.java b/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/ParquetMetadataFileWriterImpl.java index 2ce81e4ff28..7a39be5308a 100644 --- a/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/ParquetMetadataFileWriterImpl.java +++ b/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/ParquetMetadataFileWriterImpl.java @@ -14,6 +14,8 @@ import org.apache.parquet.hadoop.metadata.FileMetaData; import org.apache.parquet.hadoop.metadata.ParquetMetadata; import org.apache.parquet.schema.MessageType; +import org.jetbrains.annotations.NotNull; +import org.jetbrains.annotations.Nullable; import java.io.File; import java.io.IOException; @@ -47,9 +49,26 @@ private static class ParquetFileMetadata { private final String metadataRootDirAbsPath; private final List parquetFileMetadataList; private final SeekableChannelsProvider channelsProvider; - private List columnTypes; // Useful when merging deephaven specific metadata + private final MessageType commonSchema; - ParquetMetadataFileWriterImpl(final String metadataRootDir, final File[] destinations) { + // The following fields are used to accumulate metadata for all parquet files + private MessageType mergedSchema; + private String mergedCreatedByString; + private final Map mergedKeyValueMetaData; + private final List mergedBlocks; + + /** + * Per-column type information stored in key-value metadata + */ + private List columnTypes; + + /** + * @param metadataRootDir The root directory for the metadata files + * @param destinations The indivdual parquet file destinations, all of which must be contained in the metadata root + * @param commonSchema The common schema to be included for writing the _common_metadata file. + */ + ParquetMetadataFileWriterImpl(@NotNull final String metadataRootDir, @NotNull final File[] destinations, + @Nullable final MessageType commonSchema) { for (final File destination : destinations) { if (!destination.getAbsolutePath().startsWith(metadataRootDir)) { throw new UncheckedDeephavenException("All destinations must be contained in the provided metadata root" @@ -63,6 +82,12 @@ private static class ParquetFileMetadata { this.channelsProvider = SeekableChannelsProviderLoader.getInstance().fromServiceLoader(convertToURI(rootDir.getPath()), null); this.columnTypes = null; + this.commonSchema = commonSchema; + + this.mergedSchema = null; + this.mergedCreatedByString = null; + this.mergedKeyValueMetaData = new HashMap<>(); + this.mergedBlocks = new ArrayList<>(); } /** @@ -75,19 +100,39 @@ public void addParquetFileMetadata(final File parquetFile, final ParquetMetadata parquetFileMetadataList.add(new ParquetFileMetadata(parquetFile, metadata)); } + /** + * Write the accumulated metadata to the provided files and clear the metadata accumulated so far. + * + * @param metadataFile The destination file for the _metadata file + * @param commonMetadataFile The destination file for the _common_metadata file + */ public void writeMetadataFiles(final File metadataFile, final File commonMetadataFile) throws IOException { - final ParquetMetadata metadataFooter = mergeMetadata(); + if (parquetFileMetadataList.isEmpty()) { + throw new UncheckedDeephavenException("No parquet files to write metadata for"); + } + mergeMetadata(); + final ParquetMetadata metadataFooter = + new ParquetMetadata(new FileMetaData(mergedSchema, mergedKeyValueMetaData, mergedCreatedByString), + mergedBlocks); writeMetadataFile(metadataFooter, metadataFile.getAbsolutePath()); - metadataFooter.getBlocks().clear(); - writeMetadataFile(metadataFooter, commonMetadataFile.toString()); - parquetFileMetadataList.clear(); + // Skip the blocks data and merge schema with the common schema to write the common metadata file + // The ordering of arguments in method call is important because we want common schema to determine the overall + // ordering of the schema fields. + mergedSchema = mergeSchemaInto(mergedSchema, commonSchema); + final ParquetMetadata commonMetadataFooter = + new ParquetMetadata(new FileMetaData(mergedSchema, mergedKeyValueMetaData, mergedCreatedByString), + new ArrayList<>()); + writeMetadataFile(commonMetadataFooter, commonMetadataFile.toString()); + + // Clear the accumulated metadata + clear(); } - private ParquetMetadata mergeMetadata() throws IOException { - MessageType mergedSchema = null; - final Map mergedKeyValueMetaData = new HashMap<>(); - final List mergedBlocks = new ArrayList<>(); + /** + * Merge all the accumulated metadata for the parquet files. + */ + private void mergeMetadata() throws IOException { final Collection mergedCreatedBy = new HashSet<>(); for (final ParquetFileMetadata parquetFileMetadata : parquetFileMetadataList) { final FileMetaData fileMetaData = parquetFileMetadata.metadata.getFileMetaData(); @@ -96,12 +141,14 @@ private ParquetMetadata mergeMetadata() throws IOException { mergeBlocksInto(parquetFileMetadata, metadataRootDirAbsPath, mergedBlocks); mergedCreatedBy.add(fileMetaData.getCreatedBy()); } - final String createdByString = + mergedCreatedByString = mergedCreatedBy.size() == 1 ? mergedCreatedBy.iterator().next() : mergedCreatedBy.toString(); - return new ParquetMetadata(new FileMetaData(mergedSchema, mergedKeyValueMetaData, createdByString), - mergedBlocks); } + /** + * Merge the provided schema into the merged schema. Note that if there are common fields between the two schemas, + * the output schema will have the fields in the order they appear in the merged schema. + */ private static MessageType mergeSchemaInto(final MessageType schema, final MessageType mergedSchema) { if (mergedSchema == null) { return schema; @@ -126,7 +173,7 @@ private void mergeKeyValueMetaDataInto(final Map keyValueMetaDat } } else { // For merging deephaven-specific metadata, - // - groupingColumns, dataIndexes should always be dropped + // - groupingColumns, dataIndexes are skipped // - version is optional, so we read it from the first file's metadata // - columnTypes must be the same for all partitions final TableInfo tableInfo = TableInfo.deserializeFromJSON(entry.getValue()); @@ -135,9 +182,11 @@ private void mergeKeyValueMetaDataInto(final Map keyValueMetaDat Assert.eqNull(columnTypes, "columnTypes"); columnTypes = tableInfo.columnTypes(); mergedKeyValueMetaData.put(ParquetTableWriter.METADATA_KEY, - TableInfo.builder().addAllColumnTypes(columnTypes) + TableInfo.builder() + .addAllColumnTypes(columnTypes) .version(tableInfo.version()) - .build().serializeToJSON()); + .build() + .serializeToJSON()); } else if (!columnTypes.equals(tableInfo.columnTypes())) { throw new UncheckedDeephavenException("Could not merge metadata for key " + ParquetTableWriter.METADATA_KEY + ", has conflicting values for columnTypes: " + @@ -174,6 +223,10 @@ private void writeMetadataFile(final ParquetMetadata metadataFooter, final Strin public void clear() { parquetFileMetadataList.clear(); + mergedKeyValueMetaData.clear(); + mergedBlocks.clear(); columnTypes = null; + mergedSchema = null; + mergedCreatedByString = null; } } diff --git a/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/ParquetTableWriter.java b/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/ParquetTableWriter.java index d92ca09d19b..5f1243483d3 100644 --- a/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/ParquetTableWriter.java +++ b/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/ParquetTableWriter.java @@ -36,6 +36,7 @@ import org.apache.commons.lang3.tuple.Pair; import org.apache.parquet.bytes.HeapByteBufferAllocator; import org.apache.parquet.column.statistics.Statistics; +import org.apache.parquet.schema.MessageType; import org.apache.parquet.schema.PrimitiveType; import org.apache.parquet.schema.Types; import org.jetbrains.annotations.NotNull; @@ -229,6 +230,22 @@ private static void write( parquetFileWriter.close(); } + /** + * Get the parquet schema for a table + * + * @param table the input table + * @param definition the table definition + * @param instructions write instructions for the file + * @return the parquet schema + */ + static MessageType getSchemaForTable(@NotNull final Table table, + @NotNull final TableDefinition definition, + @NotNull final ParquetInstructions instructions) { + final Table pretransformTable = pretransformTable(table, definition); + return MappedSchema.create(new HashMap<>(), definition, pretransformTable.getRowSet(), + pretransformTable.getColumnSourceMap(), instructions).getParquetSchema(); + } + /** * Detect any missing or StringSet columns and convert them to arrays / null values as appropriate to prepare the * input table to be written to the parquet file. diff --git a/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/ParquetTools.java b/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/ParquetTools.java index 0e14af40ff9..5642f990057 100644 --- a/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/ParquetTools.java +++ b/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/ParquetTools.java @@ -10,6 +10,7 @@ import io.deephaven.base.verify.Require; import io.deephaven.engine.context.ExecutionContext; import io.deephaven.engine.table.ColumnDefinition; +import io.deephaven.engine.table.PartitionedTable; import io.deephaven.engine.table.Table; import io.deephaven.engine.table.TableDefinition; import io.deephaven.engine.table.impl.locations.util.TableDataRefreshService; @@ -58,6 +59,7 @@ import java.util.*; import static io.deephaven.parquet.base.ParquetFileReader.FILE_URI_SCHEME; +import static io.deephaven.parquet.table.ParquetTableWriter.getSchemaForTable; import static io.deephaven.util.channel.SeekableChannelsProvider.convertToURI; import static io.deephaven.parquet.table.ParquetTableWriter.PARQUET_FILE_EXTENSION; import static io.deephaven.util.type.TypeUtils.getUnboxedTypeIfBoxed; @@ -461,15 +463,87 @@ private static Map groupin return gcwim; } + /** + * Write tables to disk in parquet format with partitioning columns written as "key=value" format in a nested + * directory structure. To generate these individual partitions, this method will call + * {@link Table#partitionBy(String...) partitionBy} on all the partitioning columns. + * + * @param sourceTable The table to partition and write + * @param destinationDir The destination directory to store partitioned data in. Non-existing directories are + * created. + * @param baseName The base name for the individual partitioned tables. For example, a base name of "table" will + * result in files named "partition1/table.parquet", "partition2/table.parquet", etc. + * @param writeInstructions Write instructions for customizations while writing + */ + public static void writeKeyValuePartitionedTable(@NotNull final Table sourceTable, + @NotNull final File destinationDir, + @NotNull final String baseName, + @NotNull final ParquetInstructions writeInstructions) { + // TODO Should I take an additional write instruction for partitioning column names in case they are different + // from the table's partitioning columns? Also, I couldn't find a doc explaining how a user can convert a + // regular + // column to a partitioning column. So would be easier for users to just pass the partitioning column names. + + // Also, how should I get the baseName, should I move it to the write instructions. + // Pyarrow has this optional parameter "basename_template" used to generate basenames of written data files. + // The token ‘{i}’ will be replaced with an automatically incremented integer for files in the same folder. + // If not specified, it defaults to “someHash-{i}.parquet”. + // pyspark has names of the form "part.{i}.parquet" and allows passing a naming function which takes an integer + // and generates a file name. + + final TableDefinition sourceTableDefinition = sourceTable.getDefinition(); + final List> partitioningColumns = sourceTableDefinition.getPartitioningColumns(); + if (partitioningColumns.isEmpty()) { + throw new IllegalArgumentException("Table must have partitioning columns to write partitioned data"); + } + final String[] partitioningColNames = partitioningColumns.stream() + .map(ColumnDefinition::getName) + .toArray(String[]::new); + final PartitionedTable partitionedData = sourceTable.partitionBy(partitioningColNames); + final Table keyTable = sourceTable.selectDistinct(partitioningColNames); + final Collection partitionedTables = new ArrayList<>(); + final Collection destinations = new ArrayList<>(); + keyTable.getRowSet().forAllRowKeys(key -> { + final Object[] keyValues = Arrays.stream(partitioningColNames) + .map(keyTable::getColumnSource) + .map(colSource -> colSource.get(key)) + .toArray(); + final StringBuilder partitionTableRelativePath = new StringBuilder(); + for (int i = 0; i < partitioningColNames.length; i++) { + partitionTableRelativePath.append(File.separator) + .append(partitioningColNames[i]).append("=").append(keyValues[i]); + } + partitionTableRelativePath.append(File.separator).append(baseName).append(".parquet"); + destinations.add(new File(destinationDir, partitionTableRelativePath.toString())); + partitionedTables.add(partitionedData.constituentFor(keyValues)); + }); + + // If needed, generate schema for _common_metadata file from source table + final ParquetInstructions updatedWriteInstructions; + if (!ParquetInstructions.DEFAULT_METADATA_ROOT_DIR.equals(writeInstructions.getMetadataRootDir())) { + final MessageType commonSchema = getSchemaForTable(sourceTable, sourceTableDefinition, writeInstructions); + updatedWriteInstructions = ParquetInstructions.createWithCommonSchema(writeInstructions, commonSchema); + } else { + updatedWriteInstructions = writeInstructions; + } + final TableDefinition partitionedTableDefinition = sourceTableDefinition.getWritable(false); + ParquetTools.writeParquetTables( + partitionedTables.toArray(Table[]::new), + partitionedTableDefinition, + updatedWriteInstructions, + destinations.toArray(File[]::new), + partitionedTableDefinition.getGroupingColumnNamesArray()); + } + /** * Writes tables to disk in parquet format to a supplied set of destinations. If you specify grouping columns, there * must already be grouping information for those columns in the sources. This can be accomplished with * {@code .groupBy().ungroup()} or {@code .sort()}. * * @param sources The tables to write - * @param definition The common schema for all the tables to write + * @param definition The common definition for all the tables to write * @param writeInstructions Write instructions for customizations while writing - * @param destinations The destinations paths. Any non-existing directories in the paths provided are created. If + * @param destinations The destination paths. Any non-existing directories in the paths provided are created. If * there is an error any intermediate directories previously created are removed; note this makes this method * unsafe for concurrent use * @param groupingColumns List of columns the tables are grouped by (the write operation will store the grouping @@ -497,7 +571,8 @@ public static void writeParquetTables(@NotNull final Table[] sources, final String metadataRootDir = writeInstructions.getMetadataRootDir(); final boolean writeMetadataFiles = !ParquetInstructions.DEFAULT_METADATA_ROOT_DIR.equals(metadataRootDir); if (writeMetadataFiles) { - metadataFileWriter = new ParquetMetadataFileWriterImpl(metadataRootDir, destinations); + metadataFileWriter = new ParquetMetadataFileWriterImpl(metadataRootDir, destinations, + writeInstructions.getCommonSchema()); } else { metadataFileWriter = NullParquetMetadataFileWriter.INSTANCE; } diff --git a/extensions/parquet/table/src/test/java/io/deephaven/parquet/table/ParquetTableReadWriteTest.java b/extensions/parquet/table/src/test/java/io/deephaven/parquet/table/ParquetTableReadWriteTest.java index 818c793a559..ad9f4ee6bc4 100644 --- a/extensions/parquet/table/src/test/java/io/deephaven/parquet/table/ParquetTableReadWriteTest.java +++ b/extensions/parquet/table/src/test/java/io/deephaven/parquet/table/ParquetTableReadWriteTest.java @@ -16,7 +16,6 @@ import io.deephaven.engine.table.ColumnDefinition; import io.deephaven.engine.table.ColumnSource; import io.deephaven.engine.table.impl.SourceTable; -import io.deephaven.engine.table.impl.locations.TableDataException; import io.deephaven.engine.table.impl.select.FunctionalColumn; import io.deephaven.engine.table.impl.select.SelectColumn; import io.deephaven.engine.table.impl.sources.ReinterpretUtils; @@ -107,6 +106,7 @@ import static io.deephaven.parquet.table.ParquetTools.readSingleFileTable; import static io.deephaven.parquet.table.ParquetTools.readTable; import static io.deephaven.parquet.table.ParquetTools.writeParquetTables; +import static io.deephaven.parquet.table.ParquetTools.writeKeyValuePartitionedTable; import static io.deephaven.parquet.table.ParquetTools.writeTable; import static io.deephaven.util.QueryConstants.*; import static org.junit.Assert.*; @@ -516,7 +516,7 @@ public void parquetWithGroupingDataAndMetadataTest() { } @Test - public void flatPartitionedParquetWithMetadataTest() { + public void flatPartitionedParquetWithMetadataTest() throws IOException { // Create an empty parent directory final File parentDir = new File(rootFile, "tempDir"); parentDir.mkdir(); @@ -529,7 +529,7 @@ public void flatPartitionedParquetWithMetadataTest() { // Write without any metadata files writeParquetTables(new Table[] {someTable, someTable}, someTable.getDefinition(), ParquetInstructions.EMPTY, new File[] {firstDataFile, secondDataFile}, null); - final Table source = readTable(parentDir); + final Table source = readTable(parentDir).select(); // Now write with metadata files parentDir.delete(); @@ -546,6 +546,67 @@ public void flatPartitionedParquetWithMetadataTest() { final File metadataFile = new File(parentDir, "_metadata"); final Table fromDiskWithMetadata = readTable(metadataFile); assertTableEquals(source, fromDiskWithMetadata); + + // Now replace the underlying data files with empty files and read the size from metadata file verifying that + // we can read the size without touching the data + firstDataFile.delete(); + firstDataFile.createNewFile(); + secondDataFile.delete(); + secondDataFile.createNewFile(); + final Table fromDiskWithMetadataWithoutData = readTable(metadataFile); + assertEquals(source.size(), fromDiskWithMetadataWithoutData.size()); + } + + @Test + public void writeKeyValuePartitionedDataWithIntegerPartitionsTest() { + final TableDefinition definition = TableDefinition.of( + ColumnDefinition.ofInt("PC1").withPartitioning(), + ColumnDefinition.ofInt("PC2").withPartitioning(), + ColumnDefinition.ofLong("I")); + final Table inputData = ((QueryTable) TableTools.emptyTable(20) + .updateView("PC1 = (int)(ii%3)", + "PC2 = (int)(ii%2)", + "I = ii")) + .withDefinitionUnsafe(definition); + + final File parentDir = new File(rootFile, "writeKeyValuePartitionedDataTest"); + final ParquetInstructions writeInstructions = ParquetInstructions.builder() + .setMetadataRootDir(parentDir.getAbsolutePath()) + .build(); + writeKeyValuePartitionedTable(inputData, parentDir, "data", writeInstructions); + final Table fromDisk = readKeyValuePartitionedTable(parentDir, EMPTY); + assertTableEquals(inputData.sort("PC1", "PC2"), fromDisk.sort("PC1", "PC2")); + + final File commonMetadata = new File(parentDir, "_common_metadata"); + final Table fromDiskWithMetadata = readTable(commonMetadata); + assertTableEquals(inputData.sort("PC1", "PC2"), fromDiskWithMetadata.sort("PC1", "PC2")); + } + + @Test + public void writeKeyValuePartitionedDataWithMixedPartitionsTest() { + final TableDefinition definition = TableDefinition.of( + ColumnDefinition.ofInt("PC1").withPartitioning(), + ColumnDefinition.ofChar("PC2").withPartitioning(), + ColumnDefinition.ofString("PC3").withPartitioning(), + ColumnDefinition.ofLong("I")); + final Table inputData = ((QueryTable) TableTools.emptyTable(10) + .updateView("PC1 = (int)(ii%3)", + "PC2 = (char)(65 + (ii % 2))", + "PC3 = java.time.LocalDate.ofEpochDay(i%5).toString()", + "I = ii")) + .withDefinitionUnsafe(definition); + + final File parentDir = new File(rootFile, "writeKeyValuePartitionedDataTest"); + final ParquetInstructions writeInstructions = ParquetInstructions.builder() + .setMetadataRootDir(parentDir.getAbsolutePath()) + .build(); + writeKeyValuePartitionedTable(inputData, parentDir, "data", writeInstructions); + final Table fromDisk = readKeyValuePartitionedTable(parentDir, EMPTY); + assertTableEquals(inputData.sort("PC1", "PC2"), fromDisk.sort("PC1", "PC2")); + + final File commonMetadata = new File(parentDir, "_common_metadata"); + final Table fromDiskWithMetadata = readTable(commonMetadata); + assertTableEquals(inputData.sort("PC1", "PC2"), fromDiskWithMetadata.sort("PC1", "PC2")); } @Test