Skip to content

Commit

Permalink
Added a new API for writing a partitioned table directly
Browse files Browse the repository at this point in the history
  • Loading branch information
malhotrashivam committed Feb 28, 2024
1 parent ff99a36 commit 3e48937
Show file tree
Hide file tree
Showing 3 changed files with 68 additions and 28 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -239,10 +239,9 @@ private static void write(
* @return the parquet schema
*/
static MessageType getSchemaForTable(@NotNull final Table table,
@NotNull final TableDefinition definition,
@NotNull final ParquetInstructions instructions) {
final Table pretransformTable = pretransformTable(table, definition);
return MappedSchema.create(new HashMap<>(), definition, pretransformTable.getRowSet(),
final Table pretransformTable = pretransformTable(table, table.getDefinition());
return MappedSchema.create(new HashMap<>(), table.getDefinition(), pretransformTable.getRowSet(),
pretransformTable.getColumnSourceMap(), instructions).getParquetSchema();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@
import java.nio.file.Path;
import java.nio.file.attribute.BasicFileAttributes;
import java.util.*;
import java.util.stream.Collectors;

import static io.deephaven.parquet.base.ParquetFileReader.FILE_URI_SCHEME;
import static io.deephaven.parquet.table.ParquetTableWriter.getSchemaForTable;
Expand Down Expand Up @@ -464,9 +465,9 @@ private static Map<String, ParquetTableWriter.GroupingColumnWritingInfo> groupin
}

/**
* Write tables to disk in parquet format with partitioning columns written as "key=value" format in a nested
* directory structure. To generate these individual partitions, this method will call
* {@link Table#partitionBy(String...) partitionBy} on all the partitioning columns.
* Write table to disk in parquet format with {@link TableDefinition#getPartitioningColumns() partitioning columns}
* written as "key=value" format in a nested directory structure. To generate these individual partitions, this
* method will call {@link Table#partitionBy(String...) partitionBy} on all the partitioning columns.
*
* @param sourceTable The table to partition and write
* @param destinationDir The destination directory to store partitioned data in. Non-existing directories are
Expand All @@ -480,59 +481,96 @@ public static void writeKeyValuePartitionedTable(@NotNull final Table sourceTabl
@NotNull final String baseName,
@NotNull final ParquetInstructions writeInstructions) {
// TODO Should I take an additional write instruction for partitioning column names in case they are different
// from the table's partitioning columns? Also, I couldn't find a doc explaining how a user can convert a
// regular
// column to a partitioning column. So would be easier for users to just pass the partitioning column names.
// from the table's partitioning columns? Also, should I take partitioning column names from the user as an
// argument to this method?

// Also, how should I get the baseName, should I move it to the write instructions.
// Pyarrow has this optional parameter "basename_template" used to generate basenames of written data files.
// The token ‘{i}’ will be replaced with an automatically incremented integer for files in the same folder.
// If not specified, it defaults to “someHash-{i}.parquet”.
// pyspark has names of the form "part.{i}.parquet" and allows passing a naming function which takes an integer
// and generates a file name.

final TableDefinition sourceTableDefinition = sourceTable.getDefinition();
final List<ColumnDefinition<?>> partitioningColumns = sourceTableDefinition.getPartitioningColumns();
final List<ColumnDefinition<?>> partitioningColumns = sourceTable.getDefinition().getPartitioningColumns();
if (partitioningColumns.isEmpty()) {
throw new IllegalArgumentException("Table must have partitioning columns to write partitioned data");
}
final String[] partitioningColNames = partitioningColumns.stream()
.map(ColumnDefinition::getName)
.toArray(String[]::new);
final PartitionedTable partitionedData = sourceTable.partitionBy(partitioningColNames);
final Table keyTable = sourceTable.selectDistinct(partitioningColNames);
final Collection<Table> partitionedTables = new ArrayList<>();
final PartitionedTable partitionedTable = sourceTable.partitionBy(partitioningColNames);
writeKeyValuePartitionedTable(partitionedTable, destinationDir, baseName, writeInstructions);
}

/**
* Write a partitioned tables to disk in parquet format with all the {@link PartitionedTable#keyColumnNames() key
* columns} as "key=value" format in a nested directory structure. To generate the partitioned table, users can call
* {@link Table#partitionBy(String...) partitionBy} on the required columns.
*
* @param partitionedTable The partitioned table to write
* @param destinationDir The destination directory to store partitioned data in. Non-existing directories are
* created.
* @param baseName The base name for the individual partitioned tables. For example, a base name of "table" will
* result in files named "partition1/table.parquet", "partition2/table.parquet", etc.
* @param writeInstructions Write instructions for customizations while writing
*/
public static void writeKeyValuePartitionedTable(@NotNull final PartitionedTable partitionedTable,
@NotNull final File destinationDir,
@NotNull final String baseName,
@NotNull final ParquetInstructions writeInstructions) {
final String[] partitioningColumnNames = partitionedTable.keyColumnNames().toArray(String[]::new);
final Table keyTable;
if (partitionedTable.uniqueKeys()) {
keyTable = partitionedTable.table().view(partitioningColumnNames);
} else {
keyTable = partitionedTable.table().selectDistinct(partitioningColumnNames);
}
final Collection<Table> constituentTables = new ArrayList<>();
final Collection<File> destinations = new ArrayList<>();
keyTable.getRowSet().forAllRowKeys(key -> {
final Object[] keyValues = Arrays.stream(partitioningColNames)
final Object[] keyValues = Arrays.stream(partitioningColumnNames)
.map(keyTable::getColumnSource)
.map(colSource -> colSource.get(key))
.toArray();
final StringBuilder partitionTableRelativePath = new StringBuilder();
for (int i = 0; i < partitioningColNames.length; i++) {
for (int i = 0; i < partitioningColumnNames.length; i++) {
partitionTableRelativePath.append(File.separator)
.append(partitioningColNames[i]).append("=").append(keyValues[i]);
.append(partitioningColumnNames[i]).append("=").append(keyValues[i]);
}
partitionTableRelativePath.append(File.separator).append(baseName).append(".parquet");
destinations.add(new File(destinationDir, partitionTableRelativePath.toString()));
partitionedTables.add(partitionedData.constituentFor(keyValues));
constituentTables.add(partitionedTable.constituentFor(keyValues));
});

// If needed, generate schema for _common_metadata file from source table
// If needed, generate schema for _common_metadata file from key table
final ParquetInstructions updatedWriteInstructions;
if (!ParquetInstructions.DEFAULT_METADATA_ROOT_DIR.equals(writeInstructions.getMetadataRootDir())) {
final MessageType commonSchema = getSchemaForTable(sourceTable, sourceTableDefinition, writeInstructions);
final MessageType commonSchema = getSchemaForTable(keyTable, writeInstructions);
updatedWriteInstructions = ParquetInstructions.createWithCommonSchema(writeInstructions, commonSchema);
} else {
updatedWriteInstructions = writeInstructions;
}
final TableDefinition partitionedTableDefinition = sourceTableDefinition.getWritable(false);
final TableDefinition constituentDefinition = getNonKeyTableDefiniton(partitionedTable);
ParquetTools.writeParquetTables(
partitionedTables.toArray(Table[]::new),
partitionedTableDefinition,
constituentTables.toArray(Table[]::new),
constituentDefinition,
updatedWriteInstructions,
destinations.toArray(File[]::new),
partitionedTableDefinition.getGroupingColumnNamesArray());
constituentDefinition.getGroupingColumnNamesArray());
}

/**
* Create a table definition for the non-key columns of a partitioned table
*/
private static TableDefinition getNonKeyTableDefiniton(@NotNull final PartitionedTable partitionedTable) {
final Collection<String> keyColumnNames = partitionedTable.keyColumnNames();
final List<ColumnDefinition<?>> nonKeyColumnDefinition =
partitionedTable.constituentDefinition()
.getColumns().stream()
.filter(columnDefinition -> {
final String columnName = columnDefinition.getName();
return !keyColumnNames.contains(columnName);
}).collect(Collectors.toList());
return TableDefinition.of(nonKeyColumnDefinition);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import io.deephaven.engine.primitive.iterator.CloseableIterator;
import io.deephaven.engine.table.ColumnDefinition;
import io.deephaven.engine.table.ColumnSource;
import io.deephaven.engine.table.PartitionedTable;
import io.deephaven.engine.table.impl.SourceTable;
import io.deephaven.engine.table.impl.select.FunctionalColumn;
import io.deephaven.engine.table.impl.select.SelectColumn;
Expand Down Expand Up @@ -620,8 +621,8 @@ public void writeKeyValuePartitionedDataWithMixedPartitionsTest() {
public void someMoreKeyValuePartitionedTestsWithComplexKeys() {
final TableDefinition definition = TableDefinition.of(
ColumnDefinition.ofString("symbol").withPartitioning(),
ColumnDefinition.ofString("epic_collection_id").withPartitioning(),
ColumnDefinition.ofString("epic_request_id").withPartitioning(),
ColumnDefinition.ofString("epic_collection_id"),
ColumnDefinition.ofString("epic_request_id"),
ColumnDefinition.ofLong("I"));
final Table inputData = ((QueryTable) TableTools.emptyTable(10)
.updateView("symbol = (i % 2 == 0) ? `AA` : `BB`",
Expand All @@ -634,7 +635,9 @@ public void someMoreKeyValuePartitionedTestsWithComplexKeys() {
final ParquetInstructions writeInstructions = ParquetInstructions.builder()
.setMetadataRootDir(parentDir.getAbsolutePath())
.build();
writeKeyValuePartitionedTable(inputData, parentDir, "data", writeInstructions);
final PartitionedTable partitionedTable =
inputData.partitionBy("symbol", "epic_collection_id", "epic_request_id");
writeKeyValuePartitionedTable(partitionedTable, parentDir, "data", writeInstructions);
final Table fromDisk = readKeyValuePartitionedTable(parentDir, EMPTY);
assertTableEquals(inputData.sort("symbol", "epic_collection_id"),
fromDisk.sort("symbol", "epic_collection_id"));
Expand Down

0 comments on commit 3e48937

Please sign in to comment.