diff --git a/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/ParquetTableWriter.java b/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/ParquetTableWriter.java index 5f1243483d3..f23e7304b63 100644 --- a/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/ParquetTableWriter.java +++ b/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/ParquetTableWriter.java @@ -239,10 +239,9 @@ private static void write( * @return the parquet schema */ static MessageType getSchemaForTable(@NotNull final Table table, - @NotNull final TableDefinition definition, @NotNull final ParquetInstructions instructions) { - final Table pretransformTable = pretransformTable(table, definition); - return MappedSchema.create(new HashMap<>(), definition, pretransformTable.getRowSet(), + final Table pretransformTable = pretransformTable(table, table.getDefinition()); + return MappedSchema.create(new HashMap<>(), table.getDefinition(), pretransformTable.getRowSet(), pretransformTable.getColumnSourceMap(), instructions).getParquetSchema(); } diff --git a/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/ParquetTools.java b/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/ParquetTools.java index 5642f990057..69f9e4c986e 100644 --- a/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/ParquetTools.java +++ b/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/ParquetTools.java @@ -57,6 +57,7 @@ import java.nio.file.Path; import java.nio.file.attribute.BasicFileAttributes; import java.util.*; +import java.util.stream.Collectors; import static io.deephaven.parquet.base.ParquetFileReader.FILE_URI_SCHEME; import static io.deephaven.parquet.table.ParquetTableWriter.getSchemaForTable; @@ -464,9 +465,9 @@ private static Map groupin } /** - * Write tables to disk in parquet format with partitioning columns written as "key=value" format in a nested - * directory structure. To generate these individual partitions, this method will call - * {@link Table#partitionBy(String...) partitionBy} on all the partitioning columns. + * Write table to disk in parquet format with {@link TableDefinition#getPartitioningColumns() partitioning columns} + * written as "key=value" format in a nested directory structure. To generate these individual partitions, this + * method will call {@link Table#partitionBy(String...) partitionBy} on all the partitioning columns. * * @param sourceTable The table to partition and write * @param destinationDir The destination directory to store partitioned data in. Non-existing directories are @@ -480,9 +481,8 @@ public static void writeKeyValuePartitionedTable(@NotNull final Table sourceTabl @NotNull final String baseName, @NotNull final ParquetInstructions writeInstructions) { // TODO Should I take an additional write instruction for partitioning column names in case they are different - // from the table's partitioning columns? Also, I couldn't find a doc explaining how a user can convert a - // regular - // column to a partitioning column. So would be easier for users to just pass the partitioning column names. + // from the table's partitioning columns? Also, should I take partitioning column names from the user as an + // argument to this method? // Also, how should I get the baseName, should I move it to the write instructions. // Pyarrow has this optional parameter "basename_template" used to generate basenames of written data files. @@ -490,49 +490,87 @@ public static void writeKeyValuePartitionedTable(@NotNull final Table sourceTabl // If not specified, it defaults to “someHash-{i}.parquet”. // pyspark has names of the form "part.{i}.parquet" and allows passing a naming function which takes an integer // and generates a file name. - - final TableDefinition sourceTableDefinition = sourceTable.getDefinition(); - final List> partitioningColumns = sourceTableDefinition.getPartitioningColumns(); + final List> partitioningColumns = sourceTable.getDefinition().getPartitioningColumns(); if (partitioningColumns.isEmpty()) { throw new IllegalArgumentException("Table must have partitioning columns to write partitioned data"); } final String[] partitioningColNames = partitioningColumns.stream() .map(ColumnDefinition::getName) .toArray(String[]::new); - final PartitionedTable partitionedData = sourceTable.partitionBy(partitioningColNames); - final Table keyTable = sourceTable.selectDistinct(partitioningColNames); - final Collection partitionedTables = new ArrayList<>(); + final PartitionedTable partitionedTable = sourceTable.partitionBy(partitioningColNames); + writeKeyValuePartitionedTable(partitionedTable, destinationDir, baseName, writeInstructions); + } + + /** + * Write a partitioned tables to disk in parquet format with all the {@link PartitionedTable#keyColumnNames() key + * columns} as "key=value" format in a nested directory structure. To generate the partitioned table, users can call + * {@link Table#partitionBy(String...) partitionBy} on the required columns. + * + * @param partitionedTable The partitioned table to write + * @param destinationDir The destination directory to store partitioned data in. Non-existing directories are + * created. + * @param baseName The base name for the individual partitioned tables. For example, a base name of "table" will + * result in files named "partition1/table.parquet", "partition2/table.parquet", etc. + * @param writeInstructions Write instructions for customizations while writing + */ + public static void writeKeyValuePartitionedTable(@NotNull final PartitionedTable partitionedTable, + @NotNull final File destinationDir, + @NotNull final String baseName, + @NotNull final ParquetInstructions writeInstructions) { + final String[] partitioningColumnNames = partitionedTable.keyColumnNames().toArray(String[]::new); + final Table keyTable; + if (partitionedTable.uniqueKeys()) { + keyTable = partitionedTable.table().view(partitioningColumnNames); + } else { + keyTable = partitionedTable.table().selectDistinct(partitioningColumnNames); + } + final Collection
constituentTables = new ArrayList<>(); final Collection destinations = new ArrayList<>(); keyTable.getRowSet().forAllRowKeys(key -> { - final Object[] keyValues = Arrays.stream(partitioningColNames) + final Object[] keyValues = Arrays.stream(partitioningColumnNames) .map(keyTable::getColumnSource) .map(colSource -> colSource.get(key)) .toArray(); final StringBuilder partitionTableRelativePath = new StringBuilder(); - for (int i = 0; i < partitioningColNames.length; i++) { + for (int i = 0; i < partitioningColumnNames.length; i++) { partitionTableRelativePath.append(File.separator) - .append(partitioningColNames[i]).append("=").append(keyValues[i]); + .append(partitioningColumnNames[i]).append("=").append(keyValues[i]); } partitionTableRelativePath.append(File.separator).append(baseName).append(".parquet"); destinations.add(new File(destinationDir, partitionTableRelativePath.toString())); - partitionedTables.add(partitionedData.constituentFor(keyValues)); + constituentTables.add(partitionedTable.constituentFor(keyValues)); }); - // If needed, generate schema for _common_metadata file from source table + // If needed, generate schema for _common_metadata file from key table final ParquetInstructions updatedWriteInstructions; if (!ParquetInstructions.DEFAULT_METADATA_ROOT_DIR.equals(writeInstructions.getMetadataRootDir())) { - final MessageType commonSchema = getSchemaForTable(sourceTable, sourceTableDefinition, writeInstructions); + final MessageType commonSchema = getSchemaForTable(keyTable, writeInstructions); updatedWriteInstructions = ParquetInstructions.createWithCommonSchema(writeInstructions, commonSchema); } else { updatedWriteInstructions = writeInstructions; } - final TableDefinition partitionedTableDefinition = sourceTableDefinition.getWritable(false); + final TableDefinition constituentDefinition = getNonKeyTableDefiniton(partitionedTable); ParquetTools.writeParquetTables( - partitionedTables.toArray(Table[]::new), - partitionedTableDefinition, + constituentTables.toArray(Table[]::new), + constituentDefinition, updatedWriteInstructions, destinations.toArray(File[]::new), - partitionedTableDefinition.getGroupingColumnNamesArray()); + constituentDefinition.getGroupingColumnNamesArray()); + } + + /** + * Create a table definition for the non-key columns of a partitioned table + */ + private static TableDefinition getNonKeyTableDefiniton(@NotNull final PartitionedTable partitionedTable) { + final Collection keyColumnNames = partitionedTable.keyColumnNames(); + final List> nonKeyColumnDefinition = + partitionedTable.constituentDefinition() + .getColumns().stream() + .filter(columnDefinition -> { + final String columnName = columnDefinition.getName(); + return !keyColumnNames.contains(columnName); + }).collect(Collectors.toList()); + return TableDefinition.of(nonKeyColumnDefinition); } /** diff --git a/extensions/parquet/table/src/test/java/io/deephaven/parquet/table/ParquetTableReadWriteTest.java b/extensions/parquet/table/src/test/java/io/deephaven/parquet/table/ParquetTableReadWriteTest.java index fbe3c24bfde..b8405605482 100644 --- a/extensions/parquet/table/src/test/java/io/deephaven/parquet/table/ParquetTableReadWriteTest.java +++ b/extensions/parquet/table/src/test/java/io/deephaven/parquet/table/ParquetTableReadWriteTest.java @@ -15,6 +15,7 @@ import io.deephaven.engine.primitive.iterator.CloseableIterator; import io.deephaven.engine.table.ColumnDefinition; import io.deephaven.engine.table.ColumnSource; +import io.deephaven.engine.table.PartitionedTable; import io.deephaven.engine.table.impl.SourceTable; import io.deephaven.engine.table.impl.select.FunctionalColumn; import io.deephaven.engine.table.impl.select.SelectColumn; @@ -620,8 +621,8 @@ public void writeKeyValuePartitionedDataWithMixedPartitionsTest() { public void someMoreKeyValuePartitionedTestsWithComplexKeys() { final TableDefinition definition = TableDefinition.of( ColumnDefinition.ofString("symbol").withPartitioning(), - ColumnDefinition.ofString("epic_collection_id").withPartitioning(), - ColumnDefinition.ofString("epic_request_id").withPartitioning(), + ColumnDefinition.ofString("epic_collection_id"), + ColumnDefinition.ofString("epic_request_id"), ColumnDefinition.ofLong("I")); final Table inputData = ((QueryTable) TableTools.emptyTable(10) .updateView("symbol = (i % 2 == 0) ? `AA` : `BB`", @@ -634,7 +635,9 @@ public void someMoreKeyValuePartitionedTestsWithComplexKeys() { final ParquetInstructions writeInstructions = ParquetInstructions.builder() .setMetadataRootDir(parentDir.getAbsolutePath()) .build(); - writeKeyValuePartitionedTable(inputData, parentDir, "data", writeInstructions); + final PartitionedTable partitionedTable = + inputData.partitionBy("symbol", "epic_collection_id", "epic_request_id"); + writeKeyValuePartitionedTable(partitionedTable, parentDir, "data", writeInstructions); final Table fromDisk = readKeyValuePartitionedTable(parentDir, EMPTY); assertTableEquals(inputData.sort("symbol", "epic_collection_id"), fromDisk.sort("symbol", "epic_collection_id"));