Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use last lexigraphical key for parquet schema inference #4783

Merged
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,11 @@
*/
package io.deephaven.engine.table.impl.locations.impl;

import io.deephaven.base.verify.Require;
import io.deephaven.engine.table.impl.locations.ImmutableTableLocationKey;
import org.jetbrains.annotations.NotNull;

import java.util.Arrays;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.function.Consumer;

Expand All @@ -18,17 +17,35 @@
public final class KnownLocationKeyFinder<TLK extends ImmutableTableLocationKey>
implements TableLocationKeyFinder<TLK> {

/**
* Creates a copy of the keys from {@code finder}. If {@code comparator} is not {@code null}, the keys will be
* sorted according to that {@code comparator}.
*
* @param finder the finder
* @param comparator the comparator
* @return the known location finder
* @param <TLK> the table location key type
*/
public static <TLK extends ImmutableTableLocationKey> KnownLocationKeyFinder<TLK> copyFrom(
TableLocationKeyFinder<TLK> finder, Comparator<TLK> comparator) {
final RecordingLocationKeyFinder<TLK> recordingFinder = new RecordingLocationKeyFinder<>();
finder.findKeys(recordingFinder);
final List<TLK> mutableKeys = recordingFinder.getRecordedKeys();
if (comparator != null) {
mutableKeys.sort(comparator);
}
return new KnownLocationKeyFinder<>(mutableKeys);
}

private final List<TLK> knownKeys;

@SafeVarargs
public KnownLocationKeyFinder(@NotNull final TLK... knownKeys) {
Require.elementsNeqNull(knownKeys, "knownKeys");
this.knownKeys = knownKeys.length == 0
? Collections.emptyList()
: Collections.unmodifiableList(
knownKeys.length == 1
? Collections.singletonList(knownKeys[0])
: Arrays.asList(knownKeys));
this(Arrays.asList(knownKeys));
}

public KnownLocationKeyFinder(List<TLK> knownKeys) {
this.knownKeys = List.copyOf(knownKeys);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -687,10 +687,9 @@ public static Table readPartitionedTable(
public static Table readPartitionedTableInferSchema(
@NotNull final TableLocationKeyFinder<ParquetTableLocationKey> locationKeyFinder,
@NotNull final ParquetInstructions readInstructions) {
final RecordingLocationKeyFinder<ParquetTableLocationKey> initialKeys = new RecordingLocationKeyFinder<>();
locationKeyFinder.findKeys(initialKeys);
final List<ParquetTableLocationKey> foundKeys = initialKeys.getRecordedKeys();
if (foundKeys.isEmpty()) {
final KnownLocationKeyFinder<ParquetTableLocationKey> sortedKeys =
KnownLocationKeyFinder.copyFrom(locationKeyFinder, Comparator.naturalOrder());
if (sortedKeys.getKnownKeys().isEmpty()) {
if (readInstructions.isRefreshing()) {
throw new IllegalArgumentException(
"Unable to infer schema for a refreshing partitioned parquet table when there are no initial parquet files");
Expand All @@ -699,7 +698,7 @@ public static Table readPartitionedTableInferSchema(
}
// TODO (https://github.com/deephaven/deephaven-core/issues/877): Support schema merge when discovering multiple
// parquet files
final ParquetTableLocationKey firstKey = foundKeys.get(0);
final ParquetTableLocationKey firstKey = sortedKeys.getKnownKeys().get(0);
devinrsmith marked this conversation as resolved.
Show resolved Hide resolved
final Pair<List<ColumnDefinition<?>>, ParquetInstructions> schemaInfo = convertSchema(
firstKey.getFileReader().getSchema(),
firstKey.getMetadata().getFileMetaData().getKeyValueMetaData(),
Expand All @@ -723,7 +722,7 @@ public static Table readPartitionedTableInferSchema(
ColumnDefinition.ColumnType.Partitioning));
}
allColumns.addAll(schemaInfo.getFirst());
return readPartitionedTable(readInstructions.isRefreshing() ? locationKeyFinder : initialKeys,
return readPartitionedTable(readInstructions.isRefreshing() ? locationKeyFinder : sortedKeys,
schemaInfo.getSecond(), TableDefinition.of(allColumns));
}

Expand Down
Loading