diff --git a/engine/primitive/src/main/java/io/deephaven/engine/primitive/iterator/CloseablePrimitiveIterator.java b/engine/primitive/src/main/java/io/deephaven/engine/primitive/iterator/CloseablePrimitiveIterator.java index 7f29483e1e8..41fe961e769 100644 --- a/engine/primitive/src/main/java/io/deephaven/engine/primitive/iterator/CloseablePrimitiveIterator.java +++ b/engine/primitive/src/main/java/io/deephaven/engine/primitive/iterator/CloseablePrimitiveIterator.java @@ -5,13 +5,7 @@ import io.deephaven.util.SafeCloseable; -import java.util.Iterator; import java.util.PrimitiveIterator; -import java.util.Spliterator; -import java.util.Spliterators; -import java.util.stream.IntStream; -import java.util.stream.Stream; -import java.util.stream.StreamSupport; /** * Interface for {@link SafeCloseable closeable} {@link PrimitiveIterator primitive iterators}. diff --git a/engine/table/src/main/java/io/deephaven/engine/util/BigDecimalUtils.java b/engine/table/src/main/java/io/deephaven/engine/util/BigDecimalUtils.java index fe192c436da..eab692ec961 100644 --- a/engine/table/src/main/java/io/deephaven/engine/util/BigDecimalUtils.java +++ b/engine/table/src/main/java/io/deephaven/engine/util/BigDecimalUtils.java @@ -3,31 +3,37 @@ // package io.deephaven.engine.util; -import io.deephaven.chunk.ObjectChunk; -import io.deephaven.chunk.attributes.Values; -import io.deephaven.engine.rowset.RowSequence; +import io.deephaven.engine.primitive.iterator.CloseableIterator; import io.deephaven.engine.rowset.RowSet; -import io.deephaven.engine.table.ChunkSource; import io.deephaven.engine.table.ColumnSource; import io.deephaven.engine.table.Table; +import io.deephaven.engine.table.vectors.ObjectVectorColumnWrapper; +import io.deephaven.vector.ObjectVector; +import io.deephaven.vector.Vector; +import org.jetbrains.annotations.NotNull; +import org.jetbrains.annotations.Nullable; import java.math.BigDecimal; +import java.util.Iterator; import java.util.Properties; /** * Utilities to support BigDecimal exhaust. - * + *

* Parquet and Avro decimal types make a whole column decimal type have a fixed precision and scale; BigDecimal columns * in Deephaven are, each value, arbitrary precision (its own precision and scale). - * + *

* For static tables, it is possible to compute overall precision and scale values that fit every existing value. For * refreshing tables, we need the user to tell us. */ public class BigDecimalUtils { - private static final PrecisionAndScale EMPTY_TABLE_PRECISION_AND_SCALE = new PrecisionAndScale(1, 1); - private static final int TARGET_CHUNK_SIZE = 4096; + public static final int INVALID_PRECISION_OR_SCALE = -1; + private static final PrecisionAndScale EMPTY_TABLE_PRECISION_AND_SCALE = new PrecisionAndScale(1, 1); + private static final int INIT_MAX_PRECISION_MINUS_SCALE = -1; + private static final int INIT_MAX_SCALE = -1; + /** * Immutable way to store and pass precision and scale values. */ @@ -44,14 +50,16 @@ public PrecisionAndScale(final int precision, final int scale) { /** * Compute an overall precision and scale that would fit all existing values in a table. * - * @param t a Deephaven table - * @param colName a Column for {@code t}, which should be of {@code BigDecimal} type - * @return a {@code PrecisionAndScale} object result. + * @param table A Deephaven table + * @param colName Column for {@code table}, which should be of {@code BigDecimal} {@link ColumnSource#getType type} + * or {@link ColumnSource#getComponentType component type} + * @return A {@link PrecisionAndScale} object result. */ public static PrecisionAndScale computePrecisionAndScale( - final Table t, final String colName) { - final ColumnSource src = t.getColumnSource(colName, BigDecimal.class); - return computePrecisionAndScale(t.getRowSet(), src); + final Table table, + final String colName) { + final ColumnSource src = table.getColumnSource(colName); + return computePrecisionAndScale(table.getRowSet(), src); } /** @@ -59,12 +67,13 @@ public static PrecisionAndScale computePrecisionAndScale( * requires a full table scan to ensure the correct values are determined. * * @param rowSet The rowset for the provided column - * @param source a {@code ColumnSource} of {@code BigDecimal} type - * @return a {@code PrecisionAndScale} object result. + * @param columnSource A {@code ColumnSource} of {@code BigDecimal} {@link ColumnSource#getType type} or + * {@link ColumnSource#getComponentType component type} + * @return A {@link PrecisionAndScale} object result. */ public static PrecisionAndScale computePrecisionAndScale( final RowSet rowSet, - final ColumnSource source) { + final ColumnSource columnSource) { if (rowSet.isEmpty()) { return EMPTY_TABLE_PRECISION_AND_SCALE; } @@ -72,39 +81,94 @@ public static PrecisionAndScale computePrecisionAndScale( // We will walk the entire table to determine the max(precision - scale) and // max(scale), which corresponds to max(digits left of the decimal point), max(digits right of the decimal // point). Then we convert to (precision, scale) before returning. - int maxPrecisionMinusScale = -1; - int maxScale = -1; - try (final ChunkSource.GetContext context = source.makeGetContext(TARGET_CHUNK_SIZE); - final RowSequence.Iterator it = rowSet.getRowSequenceIterator()) { - while (it.hasMore()) { - final RowSequence rowSeq = it.getNextRowSequenceWithLength(TARGET_CHUNK_SIZE); - final ObjectChunk chunk = - source.getChunk(context, rowSeq).asObjectChunk(); - for (int i = 0; i < chunk.size(); ++i) { - final BigDecimal x = chunk.get(i); - if (x == null) { - continue; - } - - final int precision = x.precision(); - final int scale = x.scale(); - final int precisionMinusScale = precision - scale; - if (precisionMinusScale > maxPrecisionMinusScale) { - maxPrecisionMinusScale = precisionMinusScale; - } - if (scale > maxScale) { - maxScale = scale; - } + final BigDecimalParameters result = new BigDecimalParameters(INIT_MAX_PRECISION_MINUS_SCALE, INIT_MAX_SCALE); + final ObjectVector columnVector = new ObjectVectorColumnWrapper<>(columnSource, rowSet); + try (final CloseableIterator columnIterator = columnVector.iterator()) { + final Class columnType = columnSource.getType(); + if (columnType == BigDecimal.class) { + // noinspection unchecked + processFlatColumn((Iterator) columnIterator, result); + } else if (columnSource.getComponentType() == BigDecimal.class) { + if (columnType.isArray()) { + // noinspection unchecked + processArrayColumn((Iterator) columnIterator, result); + } else if (Vector.class.isAssignableFrom(columnType)) { + // noinspection unchecked + processVectorColumn((Iterator>) columnIterator, result); } + } else { + throw new IllegalArgumentException("Column source is not of type BigDecimal or an array/vector of " + + "BigDecimal, but of type " + columnType + " and component type " + + columnSource.getComponentType()); } } - // If these are < 0, then every value we visited was null - if (maxPrecisionMinusScale < 0 && maxScale < 0) { + // If these are same as initial values, then every value we visited was null + if (result.maxPrecisionMinusScale == INIT_MAX_PRECISION_MINUS_SCALE && result.maxScale == INIT_MAX_SCALE) { return EMPTY_TABLE_PRECISION_AND_SCALE; } - return new PrecisionAndScale(maxPrecisionMinusScale + maxScale, maxScale); + return new PrecisionAndScale(result.maxPrecisionMinusScale + result.maxScale, result.maxScale); + } + + private static class BigDecimalParameters { + private int maxPrecisionMinusScale; + private int maxScale; + + private BigDecimalParameters(final int maxPrecisionMinusScale, final int maxScale) { + this.maxPrecisionMinusScale = maxPrecisionMinusScale; + this.maxScale = maxScale; + } + + /** + * Update the maximum values for the parameters based on the given value. + */ + private void updateMaximum(@Nullable final BigDecimal value) { + if (value == null) { + return; + } + final int precision = value.precision(); + final int scale = value.scale(); + final int precisionMinusScale = precision - scale; + if (precisionMinusScale > maxPrecisionMinusScale) { + maxPrecisionMinusScale = precisionMinusScale; + } + if (scale > maxScale) { + maxScale = scale; + } + } + } + + private static void processFlatColumn( + @NotNull final Iterator columnIterator, + @NotNull final BigDecimalParameters result) { + columnIterator.forEachRemaining(result::updateMaximum); + } + + private static void processVectorColumn( + @NotNull final Iterator> columnIterator, + @NotNull final BigDecimalParameters result) { + columnIterator.forEachRemaining(values -> { + if (values == null) { + return; + } + try (final CloseableIterator valuesIterator = values.iterator()) { + valuesIterator.forEachRemaining(result::updateMaximum); + } + }); + } + + private static void processArrayColumn( + @NotNull final Iterator columnIterator, + @NotNull final BigDecimalParameters result) { + columnIterator.forEachRemaining(values -> { + if (values == null) { + return; + } + for (final BigDecimal value : values) { + result.updateMaximum(value); + } + }); } /** diff --git a/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/TypeInfos.java b/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/TypeInfos.java index 9307d18ad33..b95dfe98412 100644 --- a/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/TypeInfos.java +++ b/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/TypeInfos.java @@ -124,7 +124,7 @@ public static PrecisionAndScale getPrecisionAndScale( @NotNull final Map> computedCache, @NotNull final String columnName, @NotNull final RowSet rowSet, - @NotNull Supplier> columnSourceSupplier) { + @NotNull final Supplier> columnSourceSupplier) { return (PrecisionAndScale) computedCache .computeIfAbsent(columnName, unusedColumnName -> new HashMap<>()) .computeIfAbsent(ParquetCacheTags.DECIMAL_ARGS, @@ -152,7 +152,7 @@ static TypeInfo bigDecimalTypeInfo( final String columnName = column.getName(); // noinspection unchecked final PrecisionAndScale precisionAndScale = getPrecisionAndScale( - computedCache, columnName, rowSet, () -> (ColumnSource) columnSourceMap.get(columnName)); + computedCache, columnName, rowSet, () -> columnSourceMap.get(columnName)); final Set> clazzes = Set.of(BigDecimal.class); return new TypeInfo() { @Override @@ -175,14 +175,9 @@ static TypeInfo getTypeInfo( final RowSet rowSet, final Map> columnSourceMap, @NotNull final ParquetInstructions instructions) { - if (BigDecimal.class.equals(column.getDataType())) { + if (column.getDataType() == BigDecimal.class || column.getComponentType() == BigDecimal.class) { return bigDecimalTypeInfo(computedCache, column, rowSet, columnSourceMap); } - if (BigDecimal.class.equals(column.getComponentType())) { - throw new UnsupportedOperationException("Writing arrays/vector columns for big decimals is currently not " + - "supported"); - // TODO(deephaven-core#4612): Add support for this - } return lookupTypeInfo(column, instructions); } diff --git a/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/transfer/TransferObject.java b/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/transfer/TransferObject.java index e87eff5aa85..fac10941cf1 100644 --- a/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/transfer/TransferObject.java +++ b/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/transfer/TransferObject.java @@ -39,7 +39,7 @@ static TransferObject create( @NotNull final Map> computedCache, @NotNull final String columnName, @NotNull final ColumnSource columnSource) { - Class columnType = columnSource.getType(); + final Class columnType = columnSource.getType(); if (columnType == int.class) { return IntTransfer.create(columnSource, tableRowSet, instructions.getTargetPageSize()); } @@ -84,13 +84,11 @@ static TransferObject create( return new CodecTransfer<>(columnSource, codec, tableRowSet, instructions.getTargetPageSize()); } if (columnType == BigDecimal.class) { - // noinspection unchecked - final ColumnSource bigDecimalColumnSource = (ColumnSource) columnSource; final BigDecimalUtils.PrecisionAndScale precisionAndScale = TypeInfos.getPrecisionAndScale( - computedCache, columnName, tableRowSet, () -> bigDecimalColumnSource); + computedCache, columnName, tableRowSet, () -> columnSource); final ObjectCodec codec = new BigDecimalParquetBytesCodec( precisionAndScale.precision, precisionAndScale.scale); - return new CodecTransfer<>(bigDecimalColumnSource, codec, tableRowSet, instructions.getTargetPageSize()); + return new CodecTransfer<>(columnSource, codec, tableRowSet, instructions.getTargetPageSize()); } if (columnType == BigInteger.class) { return new CodecTransfer<>(columnSource, new BigIntegerParquetBytesCodec(), tableRowSet, @@ -136,6 +134,13 @@ static TransferObject create( if (componentType == String.class) { return new StringArrayTransfer(columnSource, tableRowSet, instructions.getTargetPageSize()); } + if (componentType == BigDecimal.class) { + final BigDecimalUtils.PrecisionAndScale precisionAndScale = TypeInfos.getPrecisionAndScale( + computedCache, columnName, tableRowSet, () -> columnSource); + final ObjectCodec codec = new BigDecimalParquetBytesCodec( + precisionAndScale.precision, precisionAndScale.scale); + return new CodecArrayTransfer<>(columnSource, codec, tableRowSet, instructions.getTargetPageSize()); + } if (componentType == BigInteger.class) { return new CodecArrayTransfer<>(columnSource, new BigIntegerParquetBytesCodec(), tableRowSet, instructions.getTargetPageSize()); @@ -152,7 +157,7 @@ static TransferObject create( if (componentType == LocalDateTime.class) { return new LocalDateTimeArrayTransfer(columnSource, tableRowSet, instructions.getTargetPageSize()); } - // TODO(deephaven-core#4612): Handle arrays of BigDecimal and if explicit codec provided + // TODO(deephaven-core#4612): Handle if explicit codec provided } if (Vector.class.isAssignableFrom(columnType)) { if (componentType == int.class) { @@ -182,6 +187,13 @@ static TransferObject create( if (componentType == String.class) { return new StringVectorTransfer(columnSource, tableRowSet, instructions.getTargetPageSize()); } + if (componentType == BigDecimal.class) { + final BigDecimalUtils.PrecisionAndScale precisionAndScale = TypeInfos.getPrecisionAndScale( + computedCache, columnName, tableRowSet, () -> columnSource); + final ObjectCodec codec = new BigDecimalParquetBytesCodec( + precisionAndScale.precision, precisionAndScale.scale); + return new CodecVectorTransfer<>(columnSource, codec, tableRowSet, instructions.getTargetPageSize()); + } if (componentType == BigInteger.class) { return new CodecVectorTransfer<>(columnSource, new BigIntegerParquetBytesCodec(), tableRowSet, instructions.getTargetPageSize()); @@ -198,7 +210,7 @@ static TransferObject create( if (componentType == LocalDateTime.class) { return new LocalDateTimeVectorTransfer(columnSource, tableRowSet, instructions.getTargetPageSize()); } - // TODO(deephaven-core#4612): Handle vectors of BigDecimal and if explicit codec provided + // TODO(deephaven-core#4612): Handle if explicit codec provided } // Go with the default diff --git a/extensions/parquet/table/src/test/java/io/deephaven/parquet/table/ParquetTableReadWriteTest.java b/extensions/parquet/table/src/test/java/io/deephaven/parquet/table/ParquetTableReadWriteTest.java index 5d104ec8113..a6d683afcc0 100644 --- a/extensions/parquet/table/src/test/java/io/deephaven/parquet/table/ParquetTableReadWriteTest.java +++ b/extensions/parquet/table/src/test/java/io/deephaven/parquet/table/ParquetTableReadWriteTest.java @@ -1832,29 +1832,6 @@ private static Table arrayToVectorTable(final Table table) { return arrayToVectorFormulas.isEmpty() ? table : table.updateView(arrayToVectorFormulas); } - @Test - public void testBigDecimalArrayColumn() { - final Table bdArrayTable = TableTools.emptyTable(10000).select(Selectable.from(List.of( - "someBigDecimalArrayColumn = new java.math.BigDecimal[] {i % 10 == 0 ? null : " + - "java.math.BigDecimal.valueOf(ii).stripTrailingZeros()}"))); - final File dest = new File(rootFile + File.separator + "testBigDecimalArrayColumn.parquet"); - try { - ParquetTools.writeTable(bdArrayTable, dest.getPath()); - fail("Expected exception because writing arrays of big decimal column types is not supported"); - } catch (final RuntimeException e) { - assertTrue(e.getCause() instanceof UnsupportedOperationException); - } - - // Convert array to vector table - final Table bdVectorTable = arrayToVectorTable(bdArrayTable); - try { - ParquetTools.writeTable(bdVectorTable, dest.getPath()); - fail("Expected exception because writing vectors of big decimal column types is not supported"); - } catch (final RuntimeException e) { - assertTrue(e.getCause() instanceof UnsupportedOperationException); - } - } - @Test public void testArrayColumns() { ArrayList columns = @@ -1869,6 +1846,7 @@ public void testArrayColumns() { "someByteArrayColumn = new byte[] {i % 10 == 0 ? null : (byte)i}", "someCharArrayColumn = new char[] {i % 10 == 0 ? null : (char)i}", "someTimeArrayColumn = new Instant[] {i % 10 == 0 ? null : (Instant)DateTimeUtils.now() + i}", + "someBigDecimalArrayColumn = new java.math.BigDecimal[] {i % 10 == 0 ? null : java.math.BigDecimal.valueOf(ii).stripTrailingZeros()}", "someBiArrayColumn = new java.math.BigInteger[] {i % 10 == 0 ? null : java.math.BigInteger.valueOf(i)}", "someDateArrayColumn = new java.time.LocalDate[] {i % 10 == 0 ? null : java.time.LocalDate.ofEpochDay(i)}", "someTimeArrayColumn = new java.time.LocalTime[] {i % 10 == 0 ? null : java.time.LocalTime.of(i%24, i%60, (i+10)%60)}", diff --git a/py/server/tests/test_parquet.py b/py/server/tests/test_parquet.py index 50c8cf6f68e..21dea3db552 100644 --- a/py/server/tests/test_parquet.py +++ b/py/server/tests/test_parquet.py @@ -230,6 +230,8 @@ def get_table_with_array_data(self): "someCharArrayColumn = new char[] {i % 10 == 0 ? null : (char)i}", "someTimeArrayColumn = new Instant[] {i % 10 == 0 ? null : (Instant)DateTimeUtils.now() + i}", "someBiColumn = new java.math.BigInteger[] {i % 10 == 0 ? null : java.math.BigInteger.valueOf(i)}", + "someBdColumn = new java.math.BigDecimal[] {i % 10 == 0 ? null : " + + "java.math.BigDecimal.valueOf(ii).stripTrailingZeros()}", "nullStringArrayColumn = new String[] {(String)null}", "nullIntArrayColumn = new int[] {(int)null}", "nullLongArrayColumn = new long[] {(long)null}", @@ -240,7 +242,8 @@ def get_table_with_array_data(self): "nullByteArrayColumn = new byte[] {(byte)null}", "nullCharArrayColumn = new char[] {(char)null}", "nullTimeArrayColumn = new Instant[] {(Instant)null}", - "nullBiColumn = new java.math.BigInteger[] {(java.math.BigInteger)null}" + "nullBiColumn = new java.math.BigInteger[] {(java.math.BigInteger)null}", + "nullBdColumn = new java.math.BigDecimal[] {(java.math.BigDecimal)null}" ]) return dh_table