From cd2039f439eb511a3e66de63b88c74867d6471f4 Mon Sep 17 00:00:00 2001 From: Colin Alworth Date: Tue, 25 Jun 2024 13:44:40 -0500 Subject: [PATCH] Commit #4, replicate new chunk readers for primitives --- .../chunk/ByteChunkInputStreamGenerator.java | 197 ----------------- .../barrage/chunk/ByteChunkReader.java | 204 ++++++++++++++++++ .../chunk/CharChunkInputStreamGenerator.java | 197 ----------------- .../barrage/chunk/CharChunkReader.java | 200 +++++++++++++++++ .../chunk/DefaultChunkReadingFactory.java | 51 ++--- .../DoubleChunkInputStreamGenerator.java | 197 ----------------- .../barrage/chunk/DoubleChunkReader.java | 204 ++++++++++++++++++ .../chunk/FloatChunkInputStreamGenerator.java | 197 ----------------- .../barrage/chunk/FloatChunkReader.java | 204 ++++++++++++++++++ .../chunk/IntChunkInputStreamGenerator.java | 197 ----------------- .../barrage/chunk/IntChunkReader.java | 204 ++++++++++++++++++ .../chunk/LongChunkInputStreamGenerator.java | 197 ----------------- .../barrage/chunk/LongChunkReader.java | 204 ++++++++++++++++++ .../chunk/ShortChunkInputStreamGenerator.java | 197 ----------------- .../barrage/chunk/ShortChunkReader.java | 204 ++++++++++++++++++ .../replicators/ReplicateBarrageUtils.java | 3 + 16 files changed, 1440 insertions(+), 1417 deletions(-) create mode 100644 extensions/barrage/src/main/java/io/deephaven/extensions/barrage/chunk/ByteChunkReader.java create mode 100644 extensions/barrage/src/main/java/io/deephaven/extensions/barrage/chunk/CharChunkReader.java create mode 100644 extensions/barrage/src/main/java/io/deephaven/extensions/barrage/chunk/DoubleChunkReader.java create mode 100644 extensions/barrage/src/main/java/io/deephaven/extensions/barrage/chunk/FloatChunkReader.java create mode 100644 extensions/barrage/src/main/java/io/deephaven/extensions/barrage/chunk/IntChunkReader.java create mode 100644 extensions/barrage/src/main/java/io/deephaven/extensions/barrage/chunk/LongChunkReader.java create mode 100644 extensions/barrage/src/main/java/io/deephaven/extensions/barrage/chunk/ShortChunkReader.java diff --git a/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/chunk/ByteChunkInputStreamGenerator.java b/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/chunk/ByteChunkInputStreamGenerator.java index c2cc7f3e453..d334e031bed 100644 --- a/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/chunk/ByteChunkInputStreamGenerator.java +++ b/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/chunk/ByteChunkInputStreamGenerator.java @@ -7,10 +7,7 @@ // @formatter:off package io.deephaven.extensions.barrage.chunk; -import io.deephaven.base.verify.Assert; import io.deephaven.chunk.ObjectChunk; -import io.deephaven.chunk.WritableChunk; -import io.deephaven.chunk.WritableObjectChunk; import io.deephaven.chunk.attributes.Values; import io.deephaven.chunk.util.pools.PoolableChunk; import io.deephaven.engine.primitive.function.ToByteFunction; @@ -21,17 +18,11 @@ import io.deephaven.util.datastructures.LongSizedDataStructure; import io.deephaven.chunk.ByteChunk; import io.deephaven.chunk.WritableByteChunk; -import io.deephaven.chunk.WritableLongChunk; import io.deephaven.util.type.TypeUtils; import org.jetbrains.annotations.Nullable; -import java.io.DataInput; import java.io.IOException; import java.io.OutputStream; -import java.util.Iterator; -import java.util.PrimitiveIterator; -import java.util.function.Function; -import java.util.function.IntFunction; import static io.deephaven.util.QueryConstants.*; @@ -167,192 +158,4 @@ public int drainTo(final OutputStream outputStream) throws IOException { return LongSizedDataStructure.intSize("ByteChunkInputStreamGenerator", bytesWritten); } } - - @FunctionalInterface - public interface ByteConversion { - byte apply(byte in); - - ByteConversion IDENTITY = (byte a) -> a; - } - - static WritableByteChunk extractChunkFromInputStream( - final StreamReaderOptions options, - final Iterator fieldNodeIter, - final PrimitiveIterator.OfLong bufferInfoIter, - final DataInput is, - final WritableChunk outChunk, - final int outOffset, - final int totalRows) throws IOException { - return extractChunkFromInputStreamWithConversion( - options, ByteConversion.IDENTITY, fieldNodeIter, bufferInfoIter, is, outChunk, outOffset, - totalRows); - } - - static WritableObjectChunk extractChunkFromInputStreamWithTransform( - final StreamReaderOptions options, - final Function transform, - final Iterator fieldNodeIter, - final PrimitiveIterator.OfLong bufferInfoIter, - final DataInput is, - final WritableChunk outChunk, - final int outOffset, - final int totalRows) throws IOException { - - try (final WritableByteChunk inner = extractChunkFromInputStream( - options, fieldNodeIter, bufferInfoIter, is, null, 0, 0)) { - - final WritableObjectChunk chunk = castOrCreateChunk( - outChunk, - Math.max(totalRows, inner.size()), - WritableObjectChunk::makeWritableChunk, - WritableChunk::asWritableObjectChunk); - - if (outChunk == null) { - // if we're not given an output chunk then we better be writing at the front of the new one - Assert.eqZero(outOffset, "outOffset"); - } - - for (int ii = 0; ii < inner.size(); ++ii) { - byte value = inner.get(ii); - chunk.set(outOffset + ii, transform.apply(value)); - } - - return chunk; - } - } - - static WritableByteChunk extractChunkFromInputStreamWithConversion( - final StreamReaderOptions options, - final ByteConversion conversion, - final Iterator fieldNodeIter, - final PrimitiveIterator.OfLong bufferInfoIter, - final DataInput is, - final WritableChunk outChunk, - final int outOffset, - final int totalRows) throws IOException { - - final FieldNodeInfo nodeInfo = fieldNodeIter.next(); - final long validityBuffer = bufferInfoIter.nextLong(); - final long payloadBuffer = bufferInfoIter.nextLong(); - - final WritableByteChunk chunk = castOrCreateChunk( - outChunk, - Math.max(totalRows, nodeInfo.numElements), - WritableByteChunk::makeWritableChunk, - WritableChunk::asWritableByteChunk); - - if (nodeInfo.numElements == 0) { - return chunk; - } - - final int numValidityLongs = options.useDeephavenNulls() ? 0 : (nodeInfo.numElements + 63) / 64; - try (final WritableLongChunk isValid = WritableLongChunk.makeWritableChunk(numValidityLongs)) { - if (options.useDeephavenNulls() && validityBuffer != 0) { - throw new IllegalStateException("validity buffer is non-empty, but is unnecessary"); - } - int jj = 0; - for (; jj < Math.min(numValidityLongs, validityBuffer / 8); ++jj) { - isValid.set(jj, is.readLong()); - } - final long valBufRead = jj * 8L; - if (valBufRead < validityBuffer) { - is.skipBytes(LongSizedDataStructure.intSize(DEBUG_NAME, validityBuffer - valBufRead)); - } - // we support short validity buffers - for (; jj < numValidityLongs; ++jj) { - isValid.set(jj, -1); // -1 is bit-wise representation of all ones - } - // consumed entire validity buffer by here - - final long payloadRead = (long) nodeInfo.numElements * Byte.BYTES; - Assert.geq(payloadBuffer, "payloadBuffer", payloadRead, "payloadRead"); - - if (options.useDeephavenNulls()) { - useDeephavenNulls(conversion, is, nodeInfo, chunk, outOffset); - } else { - useValidityBuffer(conversion, is, nodeInfo, chunk, outOffset, isValid); - } - - final long overhangPayload = payloadBuffer - payloadRead; - if (overhangPayload > 0) { - is.skipBytes(LongSizedDataStructure.intSize(DEBUG_NAME, overhangPayload)); - } - } - - return chunk; - } - - private static > T castOrCreateChunk( - final WritableChunk outChunk, - final int numRows, - final IntFunction chunkFactory, - final Function, T> castFunction) { - if (outChunk != null) { - return castFunction.apply(outChunk); - } - final T newChunk = chunkFactory.apply(numRows); - newChunk.setSize(numRows); - return newChunk; - } - - private static void useDeephavenNulls( - final ByteConversion conversion, - final DataInput is, - final FieldNodeInfo nodeInfo, - final WritableByteChunk chunk, - final int offset) throws IOException { - if (conversion == ByteConversion.IDENTITY) { - for (int ii = 0; ii < nodeInfo.numElements; ++ii) { - chunk.set(offset + ii, is.readByte()); - } - } else { - for (int ii = 0; ii < nodeInfo.numElements; ++ii) { - final byte in = is.readByte(); - final byte out = in == NULL_BYTE ? in : conversion.apply(in); - chunk.set(offset + ii, out); - } - } - } - - private static void useValidityBuffer( - final ByteConversion conversion, - final DataInput is, - final FieldNodeInfo nodeInfo, - final WritableByteChunk chunk, - final int offset, - final WritableLongChunk isValid) throws IOException { - final int numElements = nodeInfo.numElements; - final int numValidityWords = (numElements + 63) / 64; - - int ei = 0; - int pendingSkips = 0; - - for (int vi = 0; vi < numValidityWords; ++vi) { - int bitsLeftInThisWord = Math.min(64, numElements - vi * 64); - long validityWord = isValid.get(vi); - do { - if ((validityWord & 1) == 1) { - if (pendingSkips > 0) { - is.skipBytes(pendingSkips * Byte.BYTES); - chunk.fillWithNullValue(offset + ei, pendingSkips); - ei += pendingSkips; - pendingSkips = 0; - } - chunk.set(offset + ei++, conversion.apply(is.readByte())); - validityWord >>= 1; - bitsLeftInThisWord--; - } else { - final int skips = Math.min(Long.numberOfTrailingZeros(validityWord), bitsLeftInThisWord); - pendingSkips += skips; - validityWord >>= skips; - bitsLeftInThisWord -= skips; - } - } while (bitsLeftInThisWord > 0); - } - - if (pendingSkips > 0) { - is.skipBytes(pendingSkips * Byte.BYTES); - chunk.fillWithNullValue(offset + ei, pendingSkips); - } - } } diff --git a/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/chunk/ByteChunkReader.java b/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/chunk/ByteChunkReader.java new file mode 100644 index 00000000000..29bee0fea05 --- /dev/null +++ b/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/chunk/ByteChunkReader.java @@ -0,0 +1,204 @@ +// +// Copyright (c) 2016-2024 Deephaven Data Labs and Patent Pending +// +// ****** AUTO-GENERATED CLASS - DO NOT EDIT MANUALLY +// ****** Edit CharChunkReader and run "./gradlew replicateBarrageUtils" to regenerate +// +// @formatter:off +package io.deephaven.extensions.barrage.chunk; + +import io.deephaven.base.verify.Assert; +import io.deephaven.chunk.WritableByteChunk; +import io.deephaven.chunk.WritableChunk; +import io.deephaven.chunk.WritableLongChunk; +import io.deephaven.chunk.WritableObjectChunk; +import io.deephaven.chunk.attributes.Values; +import io.deephaven.extensions.barrage.util.StreamReaderOptions; +import io.deephaven.util.datastructures.LongSizedDataStructure; + +import java.io.DataInput; +import java.io.IOException; +import java.util.Iterator; +import java.util.PrimitiveIterator; +import java.util.function.Function; +import java.util.function.IntFunction; + +import static io.deephaven.util.QueryConstants.NULL_BYTE; + +public class ByteChunkReader implements ChunkReader { + private static final String DEBUG_NAME = "ByteChunkReader"; + private final StreamReaderOptions options; + private final ByteConversion conversion; + + @FunctionalInterface + public interface ByteConversion { + byte apply(byte in); + + ByteConversion IDENTITY = (byte a) -> a; + } + + public ByteChunkReader(StreamReaderOptions options) { + this(options, ByteConversion.IDENTITY); + } + + public ByteChunkReader(StreamReaderOptions options, ByteConversion conversion) { + this.options = options; + this.conversion = conversion; + } + + public ChunkReader transform(Function transform) { + return (fieldNodeIter, bufferInfoIter, is, outChunk, outOffset, totalRows) -> { + try (final WritableByteChunk inner = ByteChunkReader.this.read( + fieldNodeIter, bufferInfoIter, is, null, 0, 0)) { + + final WritableObjectChunk chunk = castOrCreateChunk( + outChunk, + Math.max(totalRows, inner.size()), + WritableObjectChunk::makeWritableChunk, + WritableChunk::asWritableObjectChunk); + + if (outChunk == null) { + // if we're not given an output chunk then we better be writing at the front of the new one + Assert.eqZero(outOffset, "outOffset"); + } + + for (int ii = 0; ii < inner.size(); ++ii) { + byte value = inner.get(ii); + chunk.set(outOffset + ii, transform.apply(value)); + } + + return chunk; + } + }; + } + + @Override + public WritableByteChunk read(Iterator fieldNodeIter, + PrimitiveIterator.OfLong bufferInfoIter, DataInput is, WritableChunk outChunk, int outOffset, + int totalRows) throws IOException { + + final ChunkInputStreamGenerator.FieldNodeInfo nodeInfo = fieldNodeIter.next(); + final long validityBuffer = bufferInfoIter.nextLong(); + final long payloadBuffer = bufferInfoIter.nextLong(); + + final WritableByteChunk chunk = castOrCreateChunk( + outChunk, + Math.max(totalRows, nodeInfo.numElements), + WritableByteChunk::makeWritableChunk, + WritableChunk::asWritableByteChunk); + + if (nodeInfo.numElements == 0) { + return chunk; + } + + final int numValidityLongs = options.useDeephavenNulls() ? 0 : (nodeInfo.numElements + 63) / 64; + try (final WritableLongChunk isValid = WritableLongChunk.makeWritableChunk(numValidityLongs)) { + if (options.useDeephavenNulls() && validityBuffer != 0) { + throw new IllegalStateException("validity buffer is non-empty, but is unnecessary"); + } + int jj = 0; + for (; jj < Math.min(numValidityLongs, validityBuffer / 8); ++jj) { + isValid.set(jj, is.readLong()); + } + final long valBufRead = jj * 8L; + if (valBufRead < validityBuffer) { + is.skipBytes(LongSizedDataStructure.intSize(DEBUG_NAME, validityBuffer - valBufRead)); + } + // we support short validity buffers + for (; jj < numValidityLongs; ++jj) { + isValid.set(jj, -1); // -1 is bit-wise representation of all ones + } + // consumed entire validity buffer by here + + final long payloadRead = (long) nodeInfo.numElements * Byte.BYTES; + Assert.geq(payloadBuffer, "payloadBuffer", payloadRead, "payloadRead"); + + if (options.useDeephavenNulls()) { + useDeephavenNulls(conversion, is, nodeInfo, chunk, outOffset); + } else { + useValidityBuffer(conversion, is, nodeInfo, chunk, outOffset, isValid); + } + + final long overhangPayload = payloadBuffer - payloadRead; + if (overhangPayload > 0) { + is.skipBytes(LongSizedDataStructure.intSize(DEBUG_NAME, overhangPayload)); + } + } + + return chunk; + } + + private static > T castOrCreateChunk( + final WritableChunk outChunk, + final int numRows, + final IntFunction chunkFactory, + final Function, T> castFunction) { + if (outChunk != null) { + return castFunction.apply(outChunk); + } + final T newChunk = chunkFactory.apply(numRows); + newChunk.setSize(numRows); + return newChunk; + } + + private static void useDeephavenNulls( + final ByteConversion conversion, + final DataInput is, + final ChunkInputStreamGenerator.FieldNodeInfo nodeInfo, + final WritableByteChunk chunk, + final int offset) throws IOException { + if (conversion == ByteConversion.IDENTITY) { + for (int ii = 0; ii < nodeInfo.numElements; ++ii) { + chunk.set(offset + ii, is.readByte()); + } + } else { + for (int ii = 0; ii < nodeInfo.numElements; ++ii) { + final byte in = is.readByte(); + final byte out = in == NULL_BYTE ? in : conversion.apply(in); + chunk.set(offset + ii, out); + } + } + } + + private static void useValidityBuffer( + final ByteConversion conversion, + final DataInput is, + final ChunkInputStreamGenerator.FieldNodeInfo nodeInfo, + final WritableByteChunk chunk, + final int offset, + final WritableLongChunk isValid) throws IOException { + final int numElements = nodeInfo.numElements; + final int numValidityWords = (numElements + 63) / 64; + + int ei = 0; + int pendingSkips = 0; + + for (int vi = 0; vi < numValidityWords; ++vi) { + int bitsLeftInThisWord = Math.min(64, numElements - vi * 64); + long validityWord = isValid.get(vi); + do { + if ((validityWord & 1) == 1) { + if (pendingSkips > 0) { + is.skipBytes(pendingSkips * Byte.BYTES); + chunk.fillWithNullValue(offset + ei, pendingSkips); + ei += pendingSkips; + pendingSkips = 0; + } + chunk.set(offset + ei++, conversion.apply(is.readByte())); + validityWord >>= 1; + bitsLeftInThisWord--; + } else { + final int skips = Math.min(Long.numberOfTrailingZeros(validityWord), bitsLeftInThisWord); + pendingSkips += skips; + validityWord >>= skips; + bitsLeftInThisWord -= skips; + } + } while (bitsLeftInThisWord > 0); + } + + if (pendingSkips > 0) { + is.skipBytes(pendingSkips * Byte.BYTES); + chunk.fillWithNullValue(offset + ei, pendingSkips); + } + } +} diff --git a/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/chunk/CharChunkInputStreamGenerator.java b/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/chunk/CharChunkInputStreamGenerator.java index 878bc0a6cd6..83b1f2f72f1 100644 --- a/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/chunk/CharChunkInputStreamGenerator.java +++ b/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/chunk/CharChunkInputStreamGenerator.java @@ -3,10 +3,7 @@ // package io.deephaven.extensions.barrage.chunk; -import io.deephaven.base.verify.Assert; import io.deephaven.chunk.ObjectChunk; -import io.deephaven.chunk.WritableChunk; -import io.deephaven.chunk.WritableObjectChunk; import io.deephaven.chunk.attributes.Values; import io.deephaven.chunk.util.pools.PoolableChunk; import io.deephaven.engine.primitive.function.ToCharFunction; @@ -17,17 +14,11 @@ import io.deephaven.util.datastructures.LongSizedDataStructure; import io.deephaven.chunk.CharChunk; import io.deephaven.chunk.WritableCharChunk; -import io.deephaven.chunk.WritableLongChunk; import io.deephaven.util.type.TypeUtils; import org.jetbrains.annotations.Nullable; -import java.io.DataInput; import java.io.IOException; import java.io.OutputStream; -import java.util.Iterator; -import java.util.PrimitiveIterator; -import java.util.function.Function; -import java.util.function.IntFunction; import static io.deephaven.util.QueryConstants.*; @@ -163,192 +154,4 @@ public int drainTo(final OutputStream outputStream) throws IOException { return LongSizedDataStructure.intSize("CharChunkInputStreamGenerator", bytesWritten); } } - - @FunctionalInterface - public interface CharConversion { - char apply(char in); - - CharConversion IDENTITY = (char a) -> a; - } - - static WritableCharChunk extractChunkFromInputStream( - final StreamReaderOptions options, - final Iterator fieldNodeIter, - final PrimitiveIterator.OfLong bufferInfoIter, - final DataInput is, - final WritableChunk outChunk, - final int outOffset, - final int totalRows) throws IOException { - return extractChunkFromInputStreamWithConversion( - options, CharConversion.IDENTITY, fieldNodeIter, bufferInfoIter, is, outChunk, outOffset, - totalRows); - } - - static WritableObjectChunk extractChunkFromInputStreamWithTransform( - final StreamReaderOptions options, - final Function transform, - final Iterator fieldNodeIter, - final PrimitiveIterator.OfLong bufferInfoIter, - final DataInput is, - final WritableChunk outChunk, - final int outOffset, - final int totalRows) throws IOException { - - try (final WritableCharChunk inner = extractChunkFromInputStream( - options, fieldNodeIter, bufferInfoIter, is, null, 0, 0)) { - - final WritableObjectChunk chunk = castOrCreateChunk( - outChunk, - Math.max(totalRows, inner.size()), - WritableObjectChunk::makeWritableChunk, - WritableChunk::asWritableObjectChunk); - - if (outChunk == null) { - // if we're not given an output chunk then we better be writing at the front of the new one - Assert.eqZero(outOffset, "outOffset"); - } - - for (int ii = 0; ii < inner.size(); ++ii) { - char value = inner.get(ii); - chunk.set(outOffset + ii, transform.apply(value)); - } - - return chunk; - } - } - - static WritableCharChunk extractChunkFromInputStreamWithConversion( - final StreamReaderOptions options, - final CharConversion conversion, - final Iterator fieldNodeIter, - final PrimitiveIterator.OfLong bufferInfoIter, - final DataInput is, - final WritableChunk outChunk, - final int outOffset, - final int totalRows) throws IOException { - - final FieldNodeInfo nodeInfo = fieldNodeIter.next(); - final long validityBuffer = bufferInfoIter.nextLong(); - final long payloadBuffer = bufferInfoIter.nextLong(); - - final WritableCharChunk chunk = castOrCreateChunk( - outChunk, - Math.max(totalRows, nodeInfo.numElements), - WritableCharChunk::makeWritableChunk, - WritableChunk::asWritableCharChunk); - - if (nodeInfo.numElements == 0) { - return chunk; - } - - final int numValidityLongs = options.useDeephavenNulls() ? 0 : (nodeInfo.numElements + 63) / 64; - try (final WritableLongChunk isValid = WritableLongChunk.makeWritableChunk(numValidityLongs)) { - if (options.useDeephavenNulls() && validityBuffer != 0) { - throw new IllegalStateException("validity buffer is non-empty, but is unnecessary"); - } - int jj = 0; - for (; jj < Math.min(numValidityLongs, validityBuffer / 8); ++jj) { - isValid.set(jj, is.readLong()); - } - final long valBufRead = jj * 8L; - if (valBufRead < validityBuffer) { - is.skipBytes(LongSizedDataStructure.intSize(DEBUG_NAME, validityBuffer - valBufRead)); - } - // we support short validity buffers - for (; jj < numValidityLongs; ++jj) { - isValid.set(jj, -1); // -1 is bit-wise representation of all ones - } - // consumed entire validity buffer by here - - final long payloadRead = (long) nodeInfo.numElements * Character.BYTES; - Assert.geq(payloadBuffer, "payloadBuffer", payloadRead, "payloadRead"); - - if (options.useDeephavenNulls()) { - useDeephavenNulls(conversion, is, nodeInfo, chunk, outOffset); - } else { - useValidityBuffer(conversion, is, nodeInfo, chunk, outOffset, isValid); - } - - final long overhangPayload = payloadBuffer - payloadRead; - if (overhangPayload > 0) { - is.skipBytes(LongSizedDataStructure.intSize(DEBUG_NAME, overhangPayload)); - } - } - - return chunk; - } - - private static > T castOrCreateChunk( - final WritableChunk outChunk, - final int numRows, - final IntFunction chunkFactory, - final Function, T> castFunction) { - if (outChunk != null) { - return castFunction.apply(outChunk); - } - final T newChunk = chunkFactory.apply(numRows); - newChunk.setSize(numRows); - return newChunk; - } - - private static void useDeephavenNulls( - final CharConversion conversion, - final DataInput is, - final FieldNodeInfo nodeInfo, - final WritableCharChunk chunk, - final int offset) throws IOException { - if (conversion == CharConversion.IDENTITY) { - for (int ii = 0; ii < nodeInfo.numElements; ++ii) { - chunk.set(offset + ii, is.readChar()); - } - } else { - for (int ii = 0; ii < nodeInfo.numElements; ++ii) { - final char in = is.readChar(); - final char out = in == NULL_CHAR ? in : conversion.apply(in); - chunk.set(offset + ii, out); - } - } - } - - private static void useValidityBuffer( - final CharConversion conversion, - final DataInput is, - final FieldNodeInfo nodeInfo, - final WritableCharChunk chunk, - final int offset, - final WritableLongChunk isValid) throws IOException { - final int numElements = nodeInfo.numElements; - final int numValidityWords = (numElements + 63) / 64; - - int ei = 0; - int pendingSkips = 0; - - for (int vi = 0; vi < numValidityWords; ++vi) { - int bitsLeftInThisWord = Math.min(64, numElements - vi * 64); - long validityWord = isValid.get(vi); - do { - if ((validityWord & 1) == 1) { - if (pendingSkips > 0) { - is.skipBytes(pendingSkips * Character.BYTES); - chunk.fillWithNullValue(offset + ei, pendingSkips); - ei += pendingSkips; - pendingSkips = 0; - } - chunk.set(offset + ei++, conversion.apply(is.readChar())); - validityWord >>= 1; - bitsLeftInThisWord--; - } else { - final int skips = Math.min(Long.numberOfTrailingZeros(validityWord), bitsLeftInThisWord); - pendingSkips += skips; - validityWord >>= skips; - bitsLeftInThisWord -= skips; - } - } while (bitsLeftInThisWord > 0); - } - - if (pendingSkips > 0) { - is.skipBytes(pendingSkips * Character.BYTES); - chunk.fillWithNullValue(offset + ei, pendingSkips); - } - } } diff --git a/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/chunk/CharChunkReader.java b/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/chunk/CharChunkReader.java new file mode 100644 index 00000000000..b6fce96ffbf --- /dev/null +++ b/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/chunk/CharChunkReader.java @@ -0,0 +1,200 @@ +// +// Copyright (c) 2016-2024 Deephaven Data Labs and Patent Pending +// +package io.deephaven.extensions.barrage.chunk; + +import io.deephaven.base.verify.Assert; +import io.deephaven.chunk.WritableCharChunk; +import io.deephaven.chunk.WritableChunk; +import io.deephaven.chunk.WritableLongChunk; +import io.deephaven.chunk.WritableObjectChunk; +import io.deephaven.chunk.attributes.Values; +import io.deephaven.extensions.barrage.util.StreamReaderOptions; +import io.deephaven.util.datastructures.LongSizedDataStructure; + +import java.io.DataInput; +import java.io.IOException; +import java.util.Iterator; +import java.util.PrimitiveIterator; +import java.util.function.Function; +import java.util.function.IntFunction; + +import static io.deephaven.util.QueryConstants.NULL_CHAR; + +public class CharChunkReader implements ChunkReader { + private static final String DEBUG_NAME = "CharChunkReader"; + private final StreamReaderOptions options; + private final CharConversion conversion; + + @FunctionalInterface + public interface CharConversion { + char apply(char in); + + CharConversion IDENTITY = (char a) -> a; + } + + public CharChunkReader(StreamReaderOptions options) { + this(options, CharConversion.IDENTITY); + } + + public CharChunkReader(StreamReaderOptions options, CharConversion conversion) { + this.options = options; + this.conversion = conversion; + } + + public ChunkReader transform(Function transform) { + return (fieldNodeIter, bufferInfoIter, is, outChunk, outOffset, totalRows) -> { + try (final WritableCharChunk inner = CharChunkReader.this.read( + fieldNodeIter, bufferInfoIter, is, null, 0, 0)) { + + final WritableObjectChunk chunk = castOrCreateChunk( + outChunk, + Math.max(totalRows, inner.size()), + WritableObjectChunk::makeWritableChunk, + WritableChunk::asWritableObjectChunk); + + if (outChunk == null) { + // if we're not given an output chunk then we better be writing at the front of the new one + Assert.eqZero(outOffset, "outOffset"); + } + + for (int ii = 0; ii < inner.size(); ++ii) { + char value = inner.get(ii); + chunk.set(outOffset + ii, transform.apply(value)); + } + + return chunk; + } + }; + } + + @Override + public WritableCharChunk read(Iterator fieldNodeIter, + PrimitiveIterator.OfLong bufferInfoIter, DataInput is, WritableChunk outChunk, int outOffset, + int totalRows) throws IOException { + + final ChunkInputStreamGenerator.FieldNodeInfo nodeInfo = fieldNodeIter.next(); + final long validityBuffer = bufferInfoIter.nextLong(); + final long payloadBuffer = bufferInfoIter.nextLong(); + + final WritableCharChunk chunk = castOrCreateChunk( + outChunk, + Math.max(totalRows, nodeInfo.numElements), + WritableCharChunk::makeWritableChunk, + WritableChunk::asWritableCharChunk); + + if (nodeInfo.numElements == 0) { + return chunk; + } + + final int numValidityLongs = options.useDeephavenNulls() ? 0 : (nodeInfo.numElements + 63) / 64; + try (final WritableLongChunk isValid = WritableLongChunk.makeWritableChunk(numValidityLongs)) { + if (options.useDeephavenNulls() && validityBuffer != 0) { + throw new IllegalStateException("validity buffer is non-empty, but is unnecessary"); + } + int jj = 0; + for (; jj < Math.min(numValidityLongs, validityBuffer / 8); ++jj) { + isValid.set(jj, is.readLong()); + } + final long valBufRead = jj * 8L; + if (valBufRead < validityBuffer) { + is.skipBytes(LongSizedDataStructure.intSize(DEBUG_NAME, validityBuffer - valBufRead)); + } + // we support short validity buffers + for (; jj < numValidityLongs; ++jj) { + isValid.set(jj, -1); // -1 is bit-wise representation of all ones + } + // consumed entire validity buffer by here + + final long payloadRead = (long) nodeInfo.numElements * Character.BYTES; + Assert.geq(payloadBuffer, "payloadBuffer", payloadRead, "payloadRead"); + + if (options.useDeephavenNulls()) { + useDeephavenNulls(conversion, is, nodeInfo, chunk, outOffset); + } else { + useValidityBuffer(conversion, is, nodeInfo, chunk, outOffset, isValid); + } + + final long overhangPayload = payloadBuffer - payloadRead; + if (overhangPayload > 0) { + is.skipBytes(LongSizedDataStructure.intSize(DEBUG_NAME, overhangPayload)); + } + } + + return chunk; + } + + private static > T castOrCreateChunk( + final WritableChunk outChunk, + final int numRows, + final IntFunction chunkFactory, + final Function, T> castFunction) { + if (outChunk != null) { + return castFunction.apply(outChunk); + } + final T newChunk = chunkFactory.apply(numRows); + newChunk.setSize(numRows); + return newChunk; + } + + private static void useDeephavenNulls( + final CharConversion conversion, + final DataInput is, + final ChunkInputStreamGenerator.FieldNodeInfo nodeInfo, + final WritableCharChunk chunk, + final int offset) throws IOException { + if (conversion == CharConversion.IDENTITY) { + for (int ii = 0; ii < nodeInfo.numElements; ++ii) { + chunk.set(offset + ii, is.readChar()); + } + } else { + for (int ii = 0; ii < nodeInfo.numElements; ++ii) { + final char in = is.readChar(); + final char out = in == NULL_CHAR ? in : conversion.apply(in); + chunk.set(offset + ii, out); + } + } + } + + private static void useValidityBuffer( + final CharConversion conversion, + final DataInput is, + final ChunkInputStreamGenerator.FieldNodeInfo nodeInfo, + final WritableCharChunk chunk, + final int offset, + final WritableLongChunk isValid) throws IOException { + final int numElements = nodeInfo.numElements; + final int numValidityWords = (numElements + 63) / 64; + + int ei = 0; + int pendingSkips = 0; + + for (int vi = 0; vi < numValidityWords; ++vi) { + int bitsLeftInThisWord = Math.min(64, numElements - vi * 64); + long validityWord = isValid.get(vi); + do { + if ((validityWord & 1) == 1) { + if (pendingSkips > 0) { + is.skipBytes(pendingSkips * Character.BYTES); + chunk.fillWithNullValue(offset + ei, pendingSkips); + ei += pendingSkips; + pendingSkips = 0; + } + chunk.set(offset + ei++, conversion.apply(is.readChar())); + validityWord >>= 1; + bitsLeftInThisWord--; + } else { + final int skips = Math.min(Long.numberOfTrailingZeros(validityWord), bitsLeftInThisWord); + pendingSkips += skips; + validityWord >>= skips; + bitsLeftInThisWord -= skips; + } + } while (bitsLeftInThisWord > 0); + } + + if (pendingSkips > 0) { + is.skipBytes(pendingSkips * Character.BYTES); + chunk.fillWithNullValue(offset + ei, pendingSkips); + } + } +} diff --git a/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/chunk/DefaultChunkReadingFactory.java b/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/chunk/DefaultChunkReadingFactory.java index df41a1ae7ca..bbf5b398fe7 100644 --- a/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/chunk/DefaultChunkReadingFactory.java +++ b/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/chunk/DefaultChunkReadingFactory.java @@ -37,45 +37,28 @@ public ChunkReader extractChunkFromInputStream(StreamReaderOptions options, int case Boolean: throw new UnsupportedOperationException("Booleans are reinterpreted as bytes"); case Char: - return (fieldNodeIter, bufferInfoIter, is, outChunk, outOffset, - totalRows) -> CharChunkInputStreamGenerator.extractChunkFromInputStream( - options, fieldNodeIter, bufferInfoIter, is, outChunk, outOffset, totalRows); + return new CharChunkReader(options); case Byte: if (typeInfo.type() == Boolean.class || typeInfo.type() == boolean.class) { return (fieldNodeIter, bufferInfoIter, is, outChunk, outOffset, totalRows) -> BooleanChunkInputStreamGenerator.extractChunkFromInputStream( options, fieldNodeIter, bufferInfoIter, is, outChunk, outOffset, totalRows); } - return (fieldNodeIter, bufferInfoIter, is, outChunk, outOffset, - totalRows) -> ByteChunkInputStreamGenerator.extractChunkFromInputStream( - options, fieldNodeIter, bufferInfoIter, is, outChunk, outOffset, totalRows); + return new ByteChunkReader(options); case Short: - return (fieldNodeIter, bufferInfoIter, is, outChunk, outOffset, - totalRows) -> ShortChunkInputStreamGenerator.extractChunkFromInputStream( - options, fieldNodeIter, bufferInfoIter, is, outChunk, outOffset, totalRows); + return new ShortChunkReader(options); case Int: - return (fieldNodeIter, bufferInfoIter, is, outChunk, outOffset, - totalRows) -> IntChunkInputStreamGenerator.extractChunkFromInputStream( - options, fieldNodeIter, bufferInfoIter, is, outChunk, outOffset, totalRows); + return new IntChunkReader(options); case Long: if (factor == 1) { - return (fieldNodeIter, bufferInfoIter, is, outChunk, outOffset, - totalRows) -> LongChunkInputStreamGenerator.extractChunkFromInputStream( - options, fieldNodeIter, bufferInfoIter, is, outChunk, outOffset, totalRows); + return new LongChunkReader(options); } - return (fieldNodeIter, bufferInfoIter, is, outChunk, outOffset, - totalRows) -> LongChunkInputStreamGenerator.extractChunkFromInputStreamWithConversion( - options, - (long v) -> v == QueryConstants.NULL_LONG ? QueryConstants.NULL_LONG : (v * factor), - fieldNodeIter, bufferInfoIter, is, outChunk, outOffset, totalRows); + return new LongChunkReader(options, + (long v) -> v == QueryConstants.NULL_LONG ? QueryConstants.NULL_LONG : (v * factor)); case Float: - return (fieldNodeIter, bufferInfoIter, is, outChunk, outOffset, - totalRows) -> FloatChunkInputStreamGenerator.extractChunkFromInputStream( - options, fieldNodeIter, bufferInfoIter, is, outChunk, outOffset, totalRows); + return new FloatChunkReader(options); case Double: - return (fieldNodeIter, bufferInfoIter, is, outChunk, outOffset, - totalRows) -> DoubleChunkInputStreamGenerator.extractChunkFromInputStream( - options, fieldNodeIter, bufferInfoIter, is, outChunk, outOffset, totalRows); + return new DoubleChunkReader(options); case Object: if (typeInfo.type().isArray()) { if (typeInfo.componentType() == byte.class) { @@ -196,20 +179,12 @@ public ChunkReader extractChunkFromInputStream(StreamReaderOptions options, int fieldNodeIter, bufferInfoIter, is, outChunk, outOffset, totalRows); } if (typeInfo.type() == LocalDate.class) { - return (fieldNodeIter, bufferInfoIter, is, outChunk, outOffset, - totalRows) -> LongChunkInputStreamGenerator.extractChunkFromInputStreamWithTransform( - options, - value -> value == QueryConstants.NULL_LONG - ? null - : LocalDate.ofEpochDay(value / MS_PER_DAY), - fieldNodeIter, bufferInfoIter, is, outChunk, outOffset, totalRows); + return new LongChunkReader(options).transform(value -> value == QueryConstants.NULL_LONG ? null + : LocalDate.ofEpochDay(value / MS_PER_DAY)); } if (typeInfo.type() == LocalTime.class) { - return (fieldNodeIter, bufferInfoIter, is, outChunk, outOffset, - totalRows) -> LongChunkInputStreamGenerator.extractChunkFromInputStreamWithTransform( - options, - value -> value == QueryConstants.NULL_LONG ? null : LocalTime.ofNanoOfDay(value), - fieldNodeIter, bufferInfoIter, is, outChunk, outOffset, totalRows); + return new LongChunkReader(options).transform( + value -> value == QueryConstants.NULL_LONG ? null : LocalTime.ofNanoOfDay(value)); } if (typeInfo.type() == String.class || options.columnConversionMode().equals(ColumnConversionMode.Stringify)) { diff --git a/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/chunk/DoubleChunkInputStreamGenerator.java b/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/chunk/DoubleChunkInputStreamGenerator.java index c5283a02364..a0046b67edb 100644 --- a/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/chunk/DoubleChunkInputStreamGenerator.java +++ b/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/chunk/DoubleChunkInputStreamGenerator.java @@ -9,10 +9,7 @@ import java.util.function.ToDoubleFunction; -import io.deephaven.base.verify.Assert; import io.deephaven.chunk.ObjectChunk; -import io.deephaven.chunk.WritableChunk; -import io.deephaven.chunk.WritableObjectChunk; import io.deephaven.chunk.attributes.Values; import io.deephaven.chunk.util.pools.PoolableChunk; import io.deephaven.engine.rowset.RowSet; @@ -22,17 +19,11 @@ import io.deephaven.util.datastructures.LongSizedDataStructure; import io.deephaven.chunk.DoubleChunk; import io.deephaven.chunk.WritableDoubleChunk; -import io.deephaven.chunk.WritableLongChunk; import io.deephaven.util.type.TypeUtils; import org.jetbrains.annotations.Nullable; -import java.io.DataInput; import java.io.IOException; import java.io.OutputStream; -import java.util.Iterator; -import java.util.PrimitiveIterator; -import java.util.function.Function; -import java.util.function.IntFunction; import static io.deephaven.util.QueryConstants.*; @@ -168,192 +159,4 @@ public int drainTo(final OutputStream outputStream) throws IOException { return LongSizedDataStructure.intSize("DoubleChunkInputStreamGenerator", bytesWritten); } } - - @FunctionalInterface - public interface DoubleConversion { - double apply(double in); - - DoubleConversion IDENTITY = (double a) -> a; - } - - static WritableDoubleChunk extractChunkFromInputStream( - final StreamReaderOptions options, - final Iterator fieldNodeIter, - final PrimitiveIterator.OfLong bufferInfoIter, - final DataInput is, - final WritableChunk outChunk, - final int outOffset, - final int totalRows) throws IOException { - return extractChunkFromInputStreamWithConversion( - options, DoubleConversion.IDENTITY, fieldNodeIter, bufferInfoIter, is, outChunk, outOffset, - totalRows); - } - - static WritableObjectChunk extractChunkFromInputStreamWithTransform( - final StreamReaderOptions options, - final Function transform, - final Iterator fieldNodeIter, - final PrimitiveIterator.OfLong bufferInfoIter, - final DataInput is, - final WritableChunk outChunk, - final int outOffset, - final int totalRows) throws IOException { - - try (final WritableDoubleChunk inner = extractChunkFromInputStream( - options, fieldNodeIter, bufferInfoIter, is, null, 0, 0)) { - - final WritableObjectChunk chunk = castOrCreateChunk( - outChunk, - Math.max(totalRows, inner.size()), - WritableObjectChunk::makeWritableChunk, - WritableChunk::asWritableObjectChunk); - - if (outChunk == null) { - // if we're not given an output chunk then we better be writing at the front of the new one - Assert.eqZero(outOffset, "outOffset"); - } - - for (int ii = 0; ii < inner.size(); ++ii) { - double value = inner.get(ii); - chunk.set(outOffset + ii, transform.apply(value)); - } - - return chunk; - } - } - - static WritableDoubleChunk extractChunkFromInputStreamWithConversion( - final StreamReaderOptions options, - final DoubleConversion conversion, - final Iterator fieldNodeIter, - final PrimitiveIterator.OfLong bufferInfoIter, - final DataInput is, - final WritableChunk outChunk, - final int outOffset, - final int totalRows) throws IOException { - - final FieldNodeInfo nodeInfo = fieldNodeIter.next(); - final long validityBuffer = bufferInfoIter.nextLong(); - final long payloadBuffer = bufferInfoIter.nextLong(); - - final WritableDoubleChunk chunk = castOrCreateChunk( - outChunk, - Math.max(totalRows, nodeInfo.numElements), - WritableDoubleChunk::makeWritableChunk, - WritableChunk::asWritableDoubleChunk); - - if (nodeInfo.numElements == 0) { - return chunk; - } - - final int numValidityLongs = options.useDeephavenNulls() ? 0 : (nodeInfo.numElements + 63) / 64; - try (final WritableLongChunk isValid = WritableLongChunk.makeWritableChunk(numValidityLongs)) { - if (options.useDeephavenNulls() && validityBuffer != 0) { - throw new IllegalStateException("validity buffer is non-empty, but is unnecessary"); - } - int jj = 0; - for (; jj < Math.min(numValidityLongs, validityBuffer / 8); ++jj) { - isValid.set(jj, is.readLong()); - } - final long valBufRead = jj * 8L; - if (valBufRead < validityBuffer) { - is.skipBytes(LongSizedDataStructure.intSize(DEBUG_NAME, validityBuffer - valBufRead)); - } - // we support short validity buffers - for (; jj < numValidityLongs; ++jj) { - isValid.set(jj, -1); // -1 is bit-wise representation of all ones - } - // consumed entire validity buffer by here - - final long payloadRead = (long) nodeInfo.numElements * Double.BYTES; - Assert.geq(payloadBuffer, "payloadBuffer", payloadRead, "payloadRead"); - - if (options.useDeephavenNulls()) { - useDeephavenNulls(conversion, is, nodeInfo, chunk, outOffset); - } else { - useValidityBuffer(conversion, is, nodeInfo, chunk, outOffset, isValid); - } - - final long overhangPayload = payloadBuffer - payloadRead; - if (overhangPayload > 0) { - is.skipBytes(LongSizedDataStructure.intSize(DEBUG_NAME, overhangPayload)); - } - } - - return chunk; - } - - private static > T castOrCreateChunk( - final WritableChunk outChunk, - final int numRows, - final IntFunction chunkFactory, - final Function, T> castFunction) { - if (outChunk != null) { - return castFunction.apply(outChunk); - } - final T newChunk = chunkFactory.apply(numRows); - newChunk.setSize(numRows); - return newChunk; - } - - private static void useDeephavenNulls( - final DoubleConversion conversion, - final DataInput is, - final FieldNodeInfo nodeInfo, - final WritableDoubleChunk chunk, - final int offset) throws IOException { - if (conversion == DoubleConversion.IDENTITY) { - for (int ii = 0; ii < nodeInfo.numElements; ++ii) { - chunk.set(offset + ii, is.readDouble()); - } - } else { - for (int ii = 0; ii < nodeInfo.numElements; ++ii) { - final double in = is.readDouble(); - final double out = in == NULL_DOUBLE ? in : conversion.apply(in); - chunk.set(offset + ii, out); - } - } - } - - private static void useValidityBuffer( - final DoubleConversion conversion, - final DataInput is, - final FieldNodeInfo nodeInfo, - final WritableDoubleChunk chunk, - final int offset, - final WritableLongChunk isValid) throws IOException { - final int numElements = nodeInfo.numElements; - final int numValidityWords = (numElements + 63) / 64; - - int ei = 0; - int pendingSkips = 0; - - for (int vi = 0; vi < numValidityWords; ++vi) { - int bitsLeftInThisWord = Math.min(64, numElements - vi * 64); - long validityWord = isValid.get(vi); - do { - if ((validityWord & 1) == 1) { - if (pendingSkips > 0) { - is.skipBytes(pendingSkips * Double.BYTES); - chunk.fillWithNullValue(offset + ei, pendingSkips); - ei += pendingSkips; - pendingSkips = 0; - } - chunk.set(offset + ei++, conversion.apply(is.readDouble())); - validityWord >>= 1; - bitsLeftInThisWord--; - } else { - final int skips = Math.min(Long.numberOfTrailingZeros(validityWord), bitsLeftInThisWord); - pendingSkips += skips; - validityWord >>= skips; - bitsLeftInThisWord -= skips; - } - } while (bitsLeftInThisWord > 0); - } - - if (pendingSkips > 0) { - is.skipBytes(pendingSkips * Double.BYTES); - chunk.fillWithNullValue(offset + ei, pendingSkips); - } - } } diff --git a/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/chunk/DoubleChunkReader.java b/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/chunk/DoubleChunkReader.java new file mode 100644 index 00000000000..4b72273272b --- /dev/null +++ b/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/chunk/DoubleChunkReader.java @@ -0,0 +1,204 @@ +// +// Copyright (c) 2016-2024 Deephaven Data Labs and Patent Pending +// +// ****** AUTO-GENERATED CLASS - DO NOT EDIT MANUALLY +// ****** Edit CharChunkReader and run "./gradlew replicateBarrageUtils" to regenerate +// +// @formatter:off +package io.deephaven.extensions.barrage.chunk; + +import io.deephaven.base.verify.Assert; +import io.deephaven.chunk.WritableDoubleChunk; +import io.deephaven.chunk.WritableChunk; +import io.deephaven.chunk.WritableLongChunk; +import io.deephaven.chunk.WritableObjectChunk; +import io.deephaven.chunk.attributes.Values; +import io.deephaven.extensions.barrage.util.StreamReaderOptions; +import io.deephaven.util.datastructures.LongSizedDataStructure; + +import java.io.DataInput; +import java.io.IOException; +import java.util.Iterator; +import java.util.PrimitiveIterator; +import java.util.function.Function; +import java.util.function.IntFunction; + +import static io.deephaven.util.QueryConstants.NULL_DOUBLE; + +public class DoubleChunkReader implements ChunkReader { + private static final String DEBUG_NAME = "DoubleChunkReader"; + private final StreamReaderOptions options; + private final DoubleConversion conversion; + + @FunctionalInterface + public interface DoubleConversion { + double apply(double in); + + DoubleConversion IDENTITY = (double a) -> a; + } + + public DoubleChunkReader(StreamReaderOptions options) { + this(options, DoubleConversion.IDENTITY); + } + + public DoubleChunkReader(StreamReaderOptions options, DoubleConversion conversion) { + this.options = options; + this.conversion = conversion; + } + + public ChunkReader transform(Function transform) { + return (fieldNodeIter, bufferInfoIter, is, outChunk, outOffset, totalRows) -> { + try (final WritableDoubleChunk inner = DoubleChunkReader.this.read( + fieldNodeIter, bufferInfoIter, is, null, 0, 0)) { + + final WritableObjectChunk chunk = castOrCreateChunk( + outChunk, + Math.max(totalRows, inner.size()), + WritableObjectChunk::makeWritableChunk, + WritableChunk::asWritableObjectChunk); + + if (outChunk == null) { + // if we're not given an output chunk then we better be writing at the front of the new one + Assert.eqZero(outOffset, "outOffset"); + } + + for (int ii = 0; ii < inner.size(); ++ii) { + double value = inner.get(ii); + chunk.set(outOffset + ii, transform.apply(value)); + } + + return chunk; + } + }; + } + + @Override + public WritableDoubleChunk read(Iterator fieldNodeIter, + PrimitiveIterator.OfLong bufferInfoIter, DataInput is, WritableChunk outChunk, int outOffset, + int totalRows) throws IOException { + + final ChunkInputStreamGenerator.FieldNodeInfo nodeInfo = fieldNodeIter.next(); + final long validityBuffer = bufferInfoIter.nextLong(); + final long payloadBuffer = bufferInfoIter.nextLong(); + + final WritableDoubleChunk chunk = castOrCreateChunk( + outChunk, + Math.max(totalRows, nodeInfo.numElements), + WritableDoubleChunk::makeWritableChunk, + WritableChunk::asWritableDoubleChunk); + + if (nodeInfo.numElements == 0) { + return chunk; + } + + final int numValidityLongs = options.useDeephavenNulls() ? 0 : (nodeInfo.numElements + 63) / 64; + try (final WritableLongChunk isValid = WritableLongChunk.makeWritableChunk(numValidityLongs)) { + if (options.useDeephavenNulls() && validityBuffer != 0) { + throw new IllegalStateException("validity buffer is non-empty, but is unnecessary"); + } + int jj = 0; + for (; jj < Math.min(numValidityLongs, validityBuffer / 8); ++jj) { + isValid.set(jj, is.readLong()); + } + final long valBufRead = jj * 8L; + if (valBufRead < validityBuffer) { + is.skipBytes(LongSizedDataStructure.intSize(DEBUG_NAME, validityBuffer - valBufRead)); + } + // we support short validity buffers + for (; jj < numValidityLongs; ++jj) { + isValid.set(jj, -1); // -1 is bit-wise representation of all ones + } + // consumed entire validity buffer by here + + final long payloadRead = (long) nodeInfo.numElements * Double.BYTES; + Assert.geq(payloadBuffer, "payloadBuffer", payloadRead, "payloadRead"); + + if (options.useDeephavenNulls()) { + useDeephavenNulls(conversion, is, nodeInfo, chunk, outOffset); + } else { + useValidityBuffer(conversion, is, nodeInfo, chunk, outOffset, isValid); + } + + final long overhangPayload = payloadBuffer - payloadRead; + if (overhangPayload > 0) { + is.skipBytes(LongSizedDataStructure.intSize(DEBUG_NAME, overhangPayload)); + } + } + + return chunk; + } + + private static > T castOrCreateChunk( + final WritableChunk outChunk, + final int numRows, + final IntFunction chunkFactory, + final Function, T> castFunction) { + if (outChunk != null) { + return castFunction.apply(outChunk); + } + final T newChunk = chunkFactory.apply(numRows); + newChunk.setSize(numRows); + return newChunk; + } + + private static void useDeephavenNulls( + final DoubleConversion conversion, + final DataInput is, + final ChunkInputStreamGenerator.FieldNodeInfo nodeInfo, + final WritableDoubleChunk chunk, + final int offset) throws IOException { + if (conversion == DoubleConversion.IDENTITY) { + for (int ii = 0; ii < nodeInfo.numElements; ++ii) { + chunk.set(offset + ii, is.readDouble()); + } + } else { + for (int ii = 0; ii < nodeInfo.numElements; ++ii) { + final double in = is.readDouble(); + final double out = in == NULL_DOUBLE ? in : conversion.apply(in); + chunk.set(offset + ii, out); + } + } + } + + private static void useValidityBuffer( + final DoubleConversion conversion, + final DataInput is, + final ChunkInputStreamGenerator.FieldNodeInfo nodeInfo, + final WritableDoubleChunk chunk, + final int offset, + final WritableLongChunk isValid) throws IOException { + final int numElements = nodeInfo.numElements; + final int numValidityWords = (numElements + 63) / 64; + + int ei = 0; + int pendingSkips = 0; + + for (int vi = 0; vi < numValidityWords; ++vi) { + int bitsLeftInThisWord = Math.min(64, numElements - vi * 64); + long validityWord = isValid.get(vi); + do { + if ((validityWord & 1) == 1) { + if (pendingSkips > 0) { + is.skipBytes(pendingSkips * Double.BYTES); + chunk.fillWithNullValue(offset + ei, pendingSkips); + ei += pendingSkips; + pendingSkips = 0; + } + chunk.set(offset + ei++, conversion.apply(is.readDouble())); + validityWord >>= 1; + bitsLeftInThisWord--; + } else { + final int skips = Math.min(Long.numberOfTrailingZeros(validityWord), bitsLeftInThisWord); + pendingSkips += skips; + validityWord >>= skips; + bitsLeftInThisWord -= skips; + } + } while (bitsLeftInThisWord > 0); + } + + if (pendingSkips > 0) { + is.skipBytes(pendingSkips * Double.BYTES); + chunk.fillWithNullValue(offset + ei, pendingSkips); + } + } +} diff --git a/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/chunk/FloatChunkInputStreamGenerator.java b/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/chunk/FloatChunkInputStreamGenerator.java index 19b52593bff..edd8aaccb2a 100644 --- a/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/chunk/FloatChunkInputStreamGenerator.java +++ b/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/chunk/FloatChunkInputStreamGenerator.java @@ -7,10 +7,7 @@ // @formatter:off package io.deephaven.extensions.barrage.chunk; -import io.deephaven.base.verify.Assert; import io.deephaven.chunk.ObjectChunk; -import io.deephaven.chunk.WritableChunk; -import io.deephaven.chunk.WritableObjectChunk; import io.deephaven.chunk.attributes.Values; import io.deephaven.chunk.util.pools.PoolableChunk; import io.deephaven.engine.primitive.function.ToFloatFunction; @@ -21,17 +18,11 @@ import io.deephaven.util.datastructures.LongSizedDataStructure; import io.deephaven.chunk.FloatChunk; import io.deephaven.chunk.WritableFloatChunk; -import io.deephaven.chunk.WritableLongChunk; import io.deephaven.util.type.TypeUtils; import org.jetbrains.annotations.Nullable; -import java.io.DataInput; import java.io.IOException; import java.io.OutputStream; -import java.util.Iterator; -import java.util.PrimitiveIterator; -import java.util.function.Function; -import java.util.function.IntFunction; import static io.deephaven.util.QueryConstants.*; @@ -167,192 +158,4 @@ public int drainTo(final OutputStream outputStream) throws IOException { return LongSizedDataStructure.intSize("FloatChunkInputStreamGenerator", bytesWritten); } } - - @FunctionalInterface - public interface FloatConversion { - float apply(float in); - - FloatConversion IDENTITY = (float a) -> a; - } - - static WritableFloatChunk extractChunkFromInputStream( - final StreamReaderOptions options, - final Iterator fieldNodeIter, - final PrimitiveIterator.OfLong bufferInfoIter, - final DataInput is, - final WritableChunk outChunk, - final int outOffset, - final int totalRows) throws IOException { - return extractChunkFromInputStreamWithConversion( - options, FloatConversion.IDENTITY, fieldNodeIter, bufferInfoIter, is, outChunk, outOffset, - totalRows); - } - - static WritableObjectChunk extractChunkFromInputStreamWithTransform( - final StreamReaderOptions options, - final Function transform, - final Iterator fieldNodeIter, - final PrimitiveIterator.OfLong bufferInfoIter, - final DataInput is, - final WritableChunk outChunk, - final int outOffset, - final int totalRows) throws IOException { - - try (final WritableFloatChunk inner = extractChunkFromInputStream( - options, fieldNodeIter, bufferInfoIter, is, null, 0, 0)) { - - final WritableObjectChunk chunk = castOrCreateChunk( - outChunk, - Math.max(totalRows, inner.size()), - WritableObjectChunk::makeWritableChunk, - WritableChunk::asWritableObjectChunk); - - if (outChunk == null) { - // if we're not given an output chunk then we better be writing at the front of the new one - Assert.eqZero(outOffset, "outOffset"); - } - - for (int ii = 0; ii < inner.size(); ++ii) { - float value = inner.get(ii); - chunk.set(outOffset + ii, transform.apply(value)); - } - - return chunk; - } - } - - static WritableFloatChunk extractChunkFromInputStreamWithConversion( - final StreamReaderOptions options, - final FloatConversion conversion, - final Iterator fieldNodeIter, - final PrimitiveIterator.OfLong bufferInfoIter, - final DataInput is, - final WritableChunk outChunk, - final int outOffset, - final int totalRows) throws IOException { - - final FieldNodeInfo nodeInfo = fieldNodeIter.next(); - final long validityBuffer = bufferInfoIter.nextLong(); - final long payloadBuffer = bufferInfoIter.nextLong(); - - final WritableFloatChunk chunk = castOrCreateChunk( - outChunk, - Math.max(totalRows, nodeInfo.numElements), - WritableFloatChunk::makeWritableChunk, - WritableChunk::asWritableFloatChunk); - - if (nodeInfo.numElements == 0) { - return chunk; - } - - final int numValidityLongs = options.useDeephavenNulls() ? 0 : (nodeInfo.numElements + 63) / 64; - try (final WritableLongChunk isValid = WritableLongChunk.makeWritableChunk(numValidityLongs)) { - if (options.useDeephavenNulls() && validityBuffer != 0) { - throw new IllegalStateException("validity buffer is non-empty, but is unnecessary"); - } - int jj = 0; - for (; jj < Math.min(numValidityLongs, validityBuffer / 8); ++jj) { - isValid.set(jj, is.readLong()); - } - final long valBufRead = jj * 8L; - if (valBufRead < validityBuffer) { - is.skipBytes(LongSizedDataStructure.intSize(DEBUG_NAME, validityBuffer - valBufRead)); - } - // we support short validity buffers - for (; jj < numValidityLongs; ++jj) { - isValid.set(jj, -1); // -1 is bit-wise representation of all ones - } - // consumed entire validity buffer by here - - final long payloadRead = (long) nodeInfo.numElements * Float.BYTES; - Assert.geq(payloadBuffer, "payloadBuffer", payloadRead, "payloadRead"); - - if (options.useDeephavenNulls()) { - useDeephavenNulls(conversion, is, nodeInfo, chunk, outOffset); - } else { - useValidityBuffer(conversion, is, nodeInfo, chunk, outOffset, isValid); - } - - final long overhangPayload = payloadBuffer - payloadRead; - if (overhangPayload > 0) { - is.skipBytes(LongSizedDataStructure.intSize(DEBUG_NAME, overhangPayload)); - } - } - - return chunk; - } - - private static > T castOrCreateChunk( - final WritableChunk outChunk, - final int numRows, - final IntFunction chunkFactory, - final Function, T> castFunction) { - if (outChunk != null) { - return castFunction.apply(outChunk); - } - final T newChunk = chunkFactory.apply(numRows); - newChunk.setSize(numRows); - return newChunk; - } - - private static void useDeephavenNulls( - final FloatConversion conversion, - final DataInput is, - final FieldNodeInfo nodeInfo, - final WritableFloatChunk chunk, - final int offset) throws IOException { - if (conversion == FloatConversion.IDENTITY) { - for (int ii = 0; ii < nodeInfo.numElements; ++ii) { - chunk.set(offset + ii, is.readFloat()); - } - } else { - for (int ii = 0; ii < nodeInfo.numElements; ++ii) { - final float in = is.readFloat(); - final float out = in == NULL_FLOAT ? in : conversion.apply(in); - chunk.set(offset + ii, out); - } - } - } - - private static void useValidityBuffer( - final FloatConversion conversion, - final DataInput is, - final FieldNodeInfo nodeInfo, - final WritableFloatChunk chunk, - final int offset, - final WritableLongChunk isValid) throws IOException { - final int numElements = nodeInfo.numElements; - final int numValidityWords = (numElements + 63) / 64; - - int ei = 0; - int pendingSkips = 0; - - for (int vi = 0; vi < numValidityWords; ++vi) { - int bitsLeftInThisWord = Math.min(64, numElements - vi * 64); - long validityWord = isValid.get(vi); - do { - if ((validityWord & 1) == 1) { - if (pendingSkips > 0) { - is.skipBytes(pendingSkips * Float.BYTES); - chunk.fillWithNullValue(offset + ei, pendingSkips); - ei += pendingSkips; - pendingSkips = 0; - } - chunk.set(offset + ei++, conversion.apply(is.readFloat())); - validityWord >>= 1; - bitsLeftInThisWord--; - } else { - final int skips = Math.min(Long.numberOfTrailingZeros(validityWord), bitsLeftInThisWord); - pendingSkips += skips; - validityWord >>= skips; - bitsLeftInThisWord -= skips; - } - } while (bitsLeftInThisWord > 0); - } - - if (pendingSkips > 0) { - is.skipBytes(pendingSkips * Float.BYTES); - chunk.fillWithNullValue(offset + ei, pendingSkips); - } - } } diff --git a/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/chunk/FloatChunkReader.java b/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/chunk/FloatChunkReader.java new file mode 100644 index 00000000000..6d434226235 --- /dev/null +++ b/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/chunk/FloatChunkReader.java @@ -0,0 +1,204 @@ +// +// Copyright (c) 2016-2024 Deephaven Data Labs and Patent Pending +// +// ****** AUTO-GENERATED CLASS - DO NOT EDIT MANUALLY +// ****** Edit CharChunkReader and run "./gradlew replicateBarrageUtils" to regenerate +// +// @formatter:off +package io.deephaven.extensions.barrage.chunk; + +import io.deephaven.base.verify.Assert; +import io.deephaven.chunk.WritableFloatChunk; +import io.deephaven.chunk.WritableChunk; +import io.deephaven.chunk.WritableLongChunk; +import io.deephaven.chunk.WritableObjectChunk; +import io.deephaven.chunk.attributes.Values; +import io.deephaven.extensions.barrage.util.StreamReaderOptions; +import io.deephaven.util.datastructures.LongSizedDataStructure; + +import java.io.DataInput; +import java.io.IOException; +import java.util.Iterator; +import java.util.PrimitiveIterator; +import java.util.function.Function; +import java.util.function.IntFunction; + +import static io.deephaven.util.QueryConstants.NULL_FLOAT; + +public class FloatChunkReader implements ChunkReader { + private static final String DEBUG_NAME = "FloatChunkReader"; + private final StreamReaderOptions options; + private final FloatConversion conversion; + + @FunctionalInterface + public interface FloatConversion { + float apply(float in); + + FloatConversion IDENTITY = (float a) -> a; + } + + public FloatChunkReader(StreamReaderOptions options) { + this(options, FloatConversion.IDENTITY); + } + + public FloatChunkReader(StreamReaderOptions options, FloatConversion conversion) { + this.options = options; + this.conversion = conversion; + } + + public ChunkReader transform(Function transform) { + return (fieldNodeIter, bufferInfoIter, is, outChunk, outOffset, totalRows) -> { + try (final WritableFloatChunk inner = FloatChunkReader.this.read( + fieldNodeIter, bufferInfoIter, is, null, 0, 0)) { + + final WritableObjectChunk chunk = castOrCreateChunk( + outChunk, + Math.max(totalRows, inner.size()), + WritableObjectChunk::makeWritableChunk, + WritableChunk::asWritableObjectChunk); + + if (outChunk == null) { + // if we're not given an output chunk then we better be writing at the front of the new one + Assert.eqZero(outOffset, "outOffset"); + } + + for (int ii = 0; ii < inner.size(); ++ii) { + float value = inner.get(ii); + chunk.set(outOffset + ii, transform.apply(value)); + } + + return chunk; + } + }; + } + + @Override + public WritableFloatChunk read(Iterator fieldNodeIter, + PrimitiveIterator.OfLong bufferInfoIter, DataInput is, WritableChunk outChunk, int outOffset, + int totalRows) throws IOException { + + final ChunkInputStreamGenerator.FieldNodeInfo nodeInfo = fieldNodeIter.next(); + final long validityBuffer = bufferInfoIter.nextLong(); + final long payloadBuffer = bufferInfoIter.nextLong(); + + final WritableFloatChunk chunk = castOrCreateChunk( + outChunk, + Math.max(totalRows, nodeInfo.numElements), + WritableFloatChunk::makeWritableChunk, + WritableChunk::asWritableFloatChunk); + + if (nodeInfo.numElements == 0) { + return chunk; + } + + final int numValidityLongs = options.useDeephavenNulls() ? 0 : (nodeInfo.numElements + 63) / 64; + try (final WritableLongChunk isValid = WritableLongChunk.makeWritableChunk(numValidityLongs)) { + if (options.useDeephavenNulls() && validityBuffer != 0) { + throw new IllegalStateException("validity buffer is non-empty, but is unnecessary"); + } + int jj = 0; + for (; jj < Math.min(numValidityLongs, validityBuffer / 8); ++jj) { + isValid.set(jj, is.readLong()); + } + final long valBufRead = jj * 8L; + if (valBufRead < validityBuffer) { + is.skipBytes(LongSizedDataStructure.intSize(DEBUG_NAME, validityBuffer - valBufRead)); + } + // we support short validity buffers + for (; jj < numValidityLongs; ++jj) { + isValid.set(jj, -1); // -1 is bit-wise representation of all ones + } + // consumed entire validity buffer by here + + final long payloadRead = (long) nodeInfo.numElements * Float.BYTES; + Assert.geq(payloadBuffer, "payloadBuffer", payloadRead, "payloadRead"); + + if (options.useDeephavenNulls()) { + useDeephavenNulls(conversion, is, nodeInfo, chunk, outOffset); + } else { + useValidityBuffer(conversion, is, nodeInfo, chunk, outOffset, isValid); + } + + final long overhangPayload = payloadBuffer - payloadRead; + if (overhangPayload > 0) { + is.skipBytes(LongSizedDataStructure.intSize(DEBUG_NAME, overhangPayload)); + } + } + + return chunk; + } + + private static > T castOrCreateChunk( + final WritableChunk outChunk, + final int numRows, + final IntFunction chunkFactory, + final Function, T> castFunction) { + if (outChunk != null) { + return castFunction.apply(outChunk); + } + final T newChunk = chunkFactory.apply(numRows); + newChunk.setSize(numRows); + return newChunk; + } + + private static void useDeephavenNulls( + final FloatConversion conversion, + final DataInput is, + final ChunkInputStreamGenerator.FieldNodeInfo nodeInfo, + final WritableFloatChunk chunk, + final int offset) throws IOException { + if (conversion == FloatConversion.IDENTITY) { + for (int ii = 0; ii < nodeInfo.numElements; ++ii) { + chunk.set(offset + ii, is.readFloat()); + } + } else { + for (int ii = 0; ii < nodeInfo.numElements; ++ii) { + final float in = is.readFloat(); + final float out = in == NULL_FLOAT ? in : conversion.apply(in); + chunk.set(offset + ii, out); + } + } + } + + private static void useValidityBuffer( + final FloatConversion conversion, + final DataInput is, + final ChunkInputStreamGenerator.FieldNodeInfo nodeInfo, + final WritableFloatChunk chunk, + final int offset, + final WritableLongChunk isValid) throws IOException { + final int numElements = nodeInfo.numElements; + final int numValidityWords = (numElements + 63) / 64; + + int ei = 0; + int pendingSkips = 0; + + for (int vi = 0; vi < numValidityWords; ++vi) { + int bitsLeftInThisWord = Math.min(64, numElements - vi * 64); + long validityWord = isValid.get(vi); + do { + if ((validityWord & 1) == 1) { + if (pendingSkips > 0) { + is.skipBytes(pendingSkips * Float.BYTES); + chunk.fillWithNullValue(offset + ei, pendingSkips); + ei += pendingSkips; + pendingSkips = 0; + } + chunk.set(offset + ei++, conversion.apply(is.readFloat())); + validityWord >>= 1; + bitsLeftInThisWord--; + } else { + final int skips = Math.min(Long.numberOfTrailingZeros(validityWord), bitsLeftInThisWord); + pendingSkips += skips; + validityWord >>= skips; + bitsLeftInThisWord -= skips; + } + } while (bitsLeftInThisWord > 0); + } + + if (pendingSkips > 0) { + is.skipBytes(pendingSkips * Float.BYTES); + chunk.fillWithNullValue(offset + ei, pendingSkips); + } + } +} diff --git a/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/chunk/IntChunkInputStreamGenerator.java b/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/chunk/IntChunkInputStreamGenerator.java index 91714f4dd43..87bc61b8c6d 100644 --- a/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/chunk/IntChunkInputStreamGenerator.java +++ b/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/chunk/IntChunkInputStreamGenerator.java @@ -9,10 +9,7 @@ import java.util.function.ToIntFunction; -import io.deephaven.base.verify.Assert; import io.deephaven.chunk.ObjectChunk; -import io.deephaven.chunk.WritableChunk; -import io.deephaven.chunk.WritableObjectChunk; import io.deephaven.chunk.attributes.Values; import io.deephaven.chunk.util.pools.PoolableChunk; import io.deephaven.engine.rowset.RowSet; @@ -22,17 +19,11 @@ import io.deephaven.util.datastructures.LongSizedDataStructure; import io.deephaven.chunk.IntChunk; import io.deephaven.chunk.WritableIntChunk; -import io.deephaven.chunk.WritableLongChunk; import io.deephaven.util.type.TypeUtils; import org.jetbrains.annotations.Nullable; -import java.io.DataInput; import java.io.IOException; import java.io.OutputStream; -import java.util.Iterator; -import java.util.PrimitiveIterator; -import java.util.function.Function; -import java.util.function.IntFunction; import static io.deephaven.util.QueryConstants.*; @@ -168,192 +159,4 @@ public int drainTo(final OutputStream outputStream) throws IOException { return LongSizedDataStructure.intSize("IntChunkInputStreamGenerator", bytesWritten); } } - - @FunctionalInterface - public interface IntConversion { - int apply(int in); - - IntConversion IDENTITY = (int a) -> a; - } - - static WritableIntChunk extractChunkFromInputStream( - final StreamReaderOptions options, - final Iterator fieldNodeIter, - final PrimitiveIterator.OfLong bufferInfoIter, - final DataInput is, - final WritableChunk outChunk, - final int outOffset, - final int totalRows) throws IOException { - return extractChunkFromInputStreamWithConversion( - options, IntConversion.IDENTITY, fieldNodeIter, bufferInfoIter, is, outChunk, outOffset, - totalRows); - } - - static WritableObjectChunk extractChunkFromInputStreamWithTransform( - final StreamReaderOptions options, - final Function transform, - final Iterator fieldNodeIter, - final PrimitiveIterator.OfLong bufferInfoIter, - final DataInput is, - final WritableChunk outChunk, - final int outOffset, - final int totalRows) throws IOException { - - try (final WritableIntChunk inner = extractChunkFromInputStream( - options, fieldNodeIter, bufferInfoIter, is, null, 0, 0)) { - - final WritableObjectChunk chunk = castOrCreateChunk( - outChunk, - Math.max(totalRows, inner.size()), - WritableObjectChunk::makeWritableChunk, - WritableChunk::asWritableObjectChunk); - - if (outChunk == null) { - // if we're not given an output chunk then we better be writing at the front of the new one - Assert.eqZero(outOffset, "outOffset"); - } - - for (int ii = 0; ii < inner.size(); ++ii) { - int value = inner.get(ii); - chunk.set(outOffset + ii, transform.apply(value)); - } - - return chunk; - } - } - - static WritableIntChunk extractChunkFromInputStreamWithConversion( - final StreamReaderOptions options, - final IntConversion conversion, - final Iterator fieldNodeIter, - final PrimitiveIterator.OfLong bufferInfoIter, - final DataInput is, - final WritableChunk outChunk, - final int outOffset, - final int totalRows) throws IOException { - - final FieldNodeInfo nodeInfo = fieldNodeIter.next(); - final long validityBuffer = bufferInfoIter.nextLong(); - final long payloadBuffer = bufferInfoIter.nextLong(); - - final WritableIntChunk chunk = castOrCreateChunk( - outChunk, - Math.max(totalRows, nodeInfo.numElements), - WritableIntChunk::makeWritableChunk, - WritableChunk::asWritableIntChunk); - - if (nodeInfo.numElements == 0) { - return chunk; - } - - final int numValidityLongs = options.useDeephavenNulls() ? 0 : (nodeInfo.numElements + 63) / 64; - try (final WritableLongChunk isValid = WritableLongChunk.makeWritableChunk(numValidityLongs)) { - if (options.useDeephavenNulls() && validityBuffer != 0) { - throw new IllegalStateException("validity buffer is non-empty, but is unnecessary"); - } - int jj = 0; - for (; jj < Math.min(numValidityLongs, validityBuffer / 8); ++jj) { - isValid.set(jj, is.readLong()); - } - final long valBufRead = jj * 8L; - if (valBufRead < validityBuffer) { - is.skipBytes(LongSizedDataStructure.intSize(DEBUG_NAME, validityBuffer - valBufRead)); - } - // we support short validity buffers - for (; jj < numValidityLongs; ++jj) { - isValid.set(jj, -1); // -1 is bit-wise representation of all ones - } - // consumed entire validity buffer by here - - final long payloadRead = (long) nodeInfo.numElements * Integer.BYTES; - Assert.geq(payloadBuffer, "payloadBuffer", payloadRead, "payloadRead"); - - if (options.useDeephavenNulls()) { - useDeephavenNulls(conversion, is, nodeInfo, chunk, outOffset); - } else { - useValidityBuffer(conversion, is, nodeInfo, chunk, outOffset, isValid); - } - - final long overhangPayload = payloadBuffer - payloadRead; - if (overhangPayload > 0) { - is.skipBytes(LongSizedDataStructure.intSize(DEBUG_NAME, overhangPayload)); - } - } - - return chunk; - } - - private static > T castOrCreateChunk( - final WritableChunk outChunk, - final int numRows, - final IntFunction chunkFactory, - final Function, T> castFunction) { - if (outChunk != null) { - return castFunction.apply(outChunk); - } - final T newChunk = chunkFactory.apply(numRows); - newChunk.setSize(numRows); - return newChunk; - } - - private static void useDeephavenNulls( - final IntConversion conversion, - final DataInput is, - final FieldNodeInfo nodeInfo, - final WritableIntChunk chunk, - final int offset) throws IOException { - if (conversion == IntConversion.IDENTITY) { - for (int ii = 0; ii < nodeInfo.numElements; ++ii) { - chunk.set(offset + ii, is.readInt()); - } - } else { - for (int ii = 0; ii < nodeInfo.numElements; ++ii) { - final int in = is.readInt(); - final int out = in == NULL_INT ? in : conversion.apply(in); - chunk.set(offset + ii, out); - } - } - } - - private static void useValidityBuffer( - final IntConversion conversion, - final DataInput is, - final FieldNodeInfo nodeInfo, - final WritableIntChunk chunk, - final int offset, - final WritableLongChunk isValid) throws IOException { - final int numElements = nodeInfo.numElements; - final int numValidityWords = (numElements + 63) / 64; - - int ei = 0; - int pendingSkips = 0; - - for (int vi = 0; vi < numValidityWords; ++vi) { - int bitsLeftInThisWord = Math.min(64, numElements - vi * 64); - long validityWord = isValid.get(vi); - do { - if ((validityWord & 1) == 1) { - if (pendingSkips > 0) { - is.skipBytes(pendingSkips * Integer.BYTES); - chunk.fillWithNullValue(offset + ei, pendingSkips); - ei += pendingSkips; - pendingSkips = 0; - } - chunk.set(offset + ei++, conversion.apply(is.readInt())); - validityWord >>= 1; - bitsLeftInThisWord--; - } else { - final int skips = Math.min(Long.numberOfTrailingZeros(validityWord), bitsLeftInThisWord); - pendingSkips += skips; - validityWord >>= skips; - bitsLeftInThisWord -= skips; - } - } while (bitsLeftInThisWord > 0); - } - - if (pendingSkips > 0) { - is.skipBytes(pendingSkips * Integer.BYTES); - chunk.fillWithNullValue(offset + ei, pendingSkips); - } - } } diff --git a/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/chunk/IntChunkReader.java b/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/chunk/IntChunkReader.java new file mode 100644 index 00000000000..39bce48735c --- /dev/null +++ b/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/chunk/IntChunkReader.java @@ -0,0 +1,204 @@ +// +// Copyright (c) 2016-2024 Deephaven Data Labs and Patent Pending +// +// ****** AUTO-GENERATED CLASS - DO NOT EDIT MANUALLY +// ****** Edit CharChunkReader and run "./gradlew replicateBarrageUtils" to regenerate +// +// @formatter:off +package io.deephaven.extensions.barrage.chunk; + +import io.deephaven.base.verify.Assert; +import io.deephaven.chunk.WritableIntChunk; +import io.deephaven.chunk.WritableChunk; +import io.deephaven.chunk.WritableLongChunk; +import io.deephaven.chunk.WritableObjectChunk; +import io.deephaven.chunk.attributes.Values; +import io.deephaven.extensions.barrage.util.StreamReaderOptions; +import io.deephaven.util.datastructures.LongSizedDataStructure; + +import java.io.DataInput; +import java.io.IOException; +import java.util.Iterator; +import java.util.PrimitiveIterator; +import java.util.function.Function; +import java.util.function.IntFunction; + +import static io.deephaven.util.QueryConstants.NULL_INT; + +public class IntChunkReader implements ChunkReader { + private static final String DEBUG_NAME = "IntChunkReader"; + private final StreamReaderOptions options; + private final IntConversion conversion; + + @FunctionalInterface + public interface IntConversion { + int apply(int in); + + IntConversion IDENTITY = (int a) -> a; + } + + public IntChunkReader(StreamReaderOptions options) { + this(options, IntConversion.IDENTITY); + } + + public IntChunkReader(StreamReaderOptions options, IntConversion conversion) { + this.options = options; + this.conversion = conversion; + } + + public ChunkReader transform(Function transform) { + return (fieldNodeIter, bufferInfoIter, is, outChunk, outOffset, totalRows) -> { + try (final WritableIntChunk inner = IntChunkReader.this.read( + fieldNodeIter, bufferInfoIter, is, null, 0, 0)) { + + final WritableObjectChunk chunk = castOrCreateChunk( + outChunk, + Math.max(totalRows, inner.size()), + WritableObjectChunk::makeWritableChunk, + WritableChunk::asWritableObjectChunk); + + if (outChunk == null) { + // if we're not given an output chunk then we better be writing at the front of the new one + Assert.eqZero(outOffset, "outOffset"); + } + + for (int ii = 0; ii < inner.size(); ++ii) { + int value = inner.get(ii); + chunk.set(outOffset + ii, transform.apply(value)); + } + + return chunk; + } + }; + } + + @Override + public WritableIntChunk read(Iterator fieldNodeIter, + PrimitiveIterator.OfLong bufferInfoIter, DataInput is, WritableChunk outChunk, int outOffset, + int totalRows) throws IOException { + + final ChunkInputStreamGenerator.FieldNodeInfo nodeInfo = fieldNodeIter.next(); + final long validityBuffer = bufferInfoIter.nextLong(); + final long payloadBuffer = bufferInfoIter.nextLong(); + + final WritableIntChunk chunk = castOrCreateChunk( + outChunk, + Math.max(totalRows, nodeInfo.numElements), + WritableIntChunk::makeWritableChunk, + WritableChunk::asWritableIntChunk); + + if (nodeInfo.numElements == 0) { + return chunk; + } + + final int numValidityLongs = options.useDeephavenNulls() ? 0 : (nodeInfo.numElements + 63) / 64; + try (final WritableLongChunk isValid = WritableLongChunk.makeWritableChunk(numValidityLongs)) { + if (options.useDeephavenNulls() && validityBuffer != 0) { + throw new IllegalStateException("validity buffer is non-empty, but is unnecessary"); + } + int jj = 0; + for (; jj < Math.min(numValidityLongs, validityBuffer / 8); ++jj) { + isValid.set(jj, is.readLong()); + } + final long valBufRead = jj * 8L; + if (valBufRead < validityBuffer) { + is.skipBytes(LongSizedDataStructure.intSize(DEBUG_NAME, validityBuffer - valBufRead)); + } + // we support short validity buffers + for (; jj < numValidityLongs; ++jj) { + isValid.set(jj, -1); // -1 is bit-wise representation of all ones + } + // consumed entire validity buffer by here + + final long payloadRead = (long) nodeInfo.numElements * Integer.BYTES; + Assert.geq(payloadBuffer, "payloadBuffer", payloadRead, "payloadRead"); + + if (options.useDeephavenNulls()) { + useDeephavenNulls(conversion, is, nodeInfo, chunk, outOffset); + } else { + useValidityBuffer(conversion, is, nodeInfo, chunk, outOffset, isValid); + } + + final long overhangPayload = payloadBuffer - payloadRead; + if (overhangPayload > 0) { + is.skipBytes(LongSizedDataStructure.intSize(DEBUG_NAME, overhangPayload)); + } + } + + return chunk; + } + + private static > T castOrCreateChunk( + final WritableChunk outChunk, + final int numRows, + final IntFunction chunkFactory, + final Function, T> castFunction) { + if (outChunk != null) { + return castFunction.apply(outChunk); + } + final T newChunk = chunkFactory.apply(numRows); + newChunk.setSize(numRows); + return newChunk; + } + + private static void useDeephavenNulls( + final IntConversion conversion, + final DataInput is, + final ChunkInputStreamGenerator.FieldNodeInfo nodeInfo, + final WritableIntChunk chunk, + final int offset) throws IOException { + if (conversion == IntConversion.IDENTITY) { + for (int ii = 0; ii < nodeInfo.numElements; ++ii) { + chunk.set(offset + ii, is.readInt()); + } + } else { + for (int ii = 0; ii < nodeInfo.numElements; ++ii) { + final int in = is.readInt(); + final int out = in == NULL_INT ? in : conversion.apply(in); + chunk.set(offset + ii, out); + } + } + } + + private static void useValidityBuffer( + final IntConversion conversion, + final DataInput is, + final ChunkInputStreamGenerator.FieldNodeInfo nodeInfo, + final WritableIntChunk chunk, + final int offset, + final WritableLongChunk isValid) throws IOException { + final int numElements = nodeInfo.numElements; + final int numValidityWords = (numElements + 63) / 64; + + int ei = 0; + int pendingSkips = 0; + + for (int vi = 0; vi < numValidityWords; ++vi) { + int bitsLeftInThisWord = Math.min(64, numElements - vi * 64); + long validityWord = isValid.get(vi); + do { + if ((validityWord & 1) == 1) { + if (pendingSkips > 0) { + is.skipBytes(pendingSkips * Integer.BYTES); + chunk.fillWithNullValue(offset + ei, pendingSkips); + ei += pendingSkips; + pendingSkips = 0; + } + chunk.set(offset + ei++, conversion.apply(is.readInt())); + validityWord >>= 1; + bitsLeftInThisWord--; + } else { + final int skips = Math.min(Long.numberOfTrailingZeros(validityWord), bitsLeftInThisWord); + pendingSkips += skips; + validityWord >>= skips; + bitsLeftInThisWord -= skips; + } + } while (bitsLeftInThisWord > 0); + } + + if (pendingSkips > 0) { + is.skipBytes(pendingSkips * Integer.BYTES); + chunk.fillWithNullValue(offset + ei, pendingSkips); + } + } +} diff --git a/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/chunk/LongChunkInputStreamGenerator.java b/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/chunk/LongChunkInputStreamGenerator.java index a28c4006d1d..671d972ccce 100644 --- a/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/chunk/LongChunkInputStreamGenerator.java +++ b/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/chunk/LongChunkInputStreamGenerator.java @@ -9,10 +9,7 @@ import java.util.function.ToLongFunction; -import io.deephaven.base.verify.Assert; import io.deephaven.chunk.ObjectChunk; -import io.deephaven.chunk.WritableChunk; -import io.deephaven.chunk.WritableObjectChunk; import io.deephaven.chunk.attributes.Values; import io.deephaven.chunk.util.pools.PoolableChunk; import io.deephaven.engine.rowset.RowSet; @@ -22,17 +19,11 @@ import io.deephaven.util.datastructures.LongSizedDataStructure; import io.deephaven.chunk.LongChunk; import io.deephaven.chunk.WritableLongChunk; -import io.deephaven.chunk.WritableLongChunk; import io.deephaven.util.type.TypeUtils; import org.jetbrains.annotations.Nullable; -import java.io.DataInput; import java.io.IOException; import java.io.OutputStream; -import java.util.Iterator; -import java.util.PrimitiveIterator; -import java.util.function.Function; -import java.util.function.IntFunction; import static io.deephaven.util.QueryConstants.*; @@ -168,192 +159,4 @@ public int drainTo(final OutputStream outputStream) throws IOException { return LongSizedDataStructure.intSize("LongChunkInputStreamGenerator", bytesWritten); } } - - @FunctionalInterface - public interface LongConversion { - long apply(long in); - - LongConversion IDENTITY = (long a) -> a; - } - - static WritableLongChunk extractChunkFromInputStream( - final StreamReaderOptions options, - final Iterator fieldNodeIter, - final PrimitiveIterator.OfLong bufferInfoIter, - final DataInput is, - final WritableChunk outChunk, - final int outOffset, - final int totalRows) throws IOException { - return extractChunkFromInputStreamWithConversion( - options, LongConversion.IDENTITY, fieldNodeIter, bufferInfoIter, is, outChunk, outOffset, - totalRows); - } - - static WritableObjectChunk extractChunkFromInputStreamWithTransform( - final StreamReaderOptions options, - final Function transform, - final Iterator fieldNodeIter, - final PrimitiveIterator.OfLong bufferInfoIter, - final DataInput is, - final WritableChunk outChunk, - final int outOffset, - final int totalRows) throws IOException { - - try (final WritableLongChunk inner = extractChunkFromInputStream( - options, fieldNodeIter, bufferInfoIter, is, null, 0, 0)) { - - final WritableObjectChunk chunk = castOrCreateChunk( - outChunk, - Math.max(totalRows, inner.size()), - WritableObjectChunk::makeWritableChunk, - WritableChunk::asWritableObjectChunk); - - if (outChunk == null) { - // if we're not given an output chunk then we better be writing at the front of the new one - Assert.eqZero(outOffset, "outOffset"); - } - - for (int ii = 0; ii < inner.size(); ++ii) { - long value = inner.get(ii); - chunk.set(outOffset + ii, transform.apply(value)); - } - - return chunk; - } - } - - static WritableLongChunk extractChunkFromInputStreamWithConversion( - final StreamReaderOptions options, - final LongConversion conversion, - final Iterator fieldNodeIter, - final PrimitiveIterator.OfLong bufferInfoIter, - final DataInput is, - final WritableChunk outChunk, - final int outOffset, - final int totalRows) throws IOException { - - final FieldNodeInfo nodeInfo = fieldNodeIter.next(); - final long validityBuffer = bufferInfoIter.nextLong(); - final long payloadBuffer = bufferInfoIter.nextLong(); - - final WritableLongChunk chunk = castOrCreateChunk( - outChunk, - Math.max(totalRows, nodeInfo.numElements), - WritableLongChunk::makeWritableChunk, - WritableChunk::asWritableLongChunk); - - if (nodeInfo.numElements == 0) { - return chunk; - } - - final int numValidityLongs = options.useDeephavenNulls() ? 0 : (nodeInfo.numElements + 63) / 64; - try (final WritableLongChunk isValid = WritableLongChunk.makeWritableChunk(numValidityLongs)) { - if (options.useDeephavenNulls() && validityBuffer != 0) { - throw new IllegalStateException("validity buffer is non-empty, but is unnecessary"); - } - int jj = 0; - for (; jj < Math.min(numValidityLongs, validityBuffer / 8); ++jj) { - isValid.set(jj, is.readLong()); - } - final long valBufRead = jj * 8L; - if (valBufRead < validityBuffer) { - is.skipBytes(LongSizedDataStructure.intSize(DEBUG_NAME, validityBuffer - valBufRead)); - } - // we support short validity buffers - for (; jj < numValidityLongs; ++jj) { - isValid.set(jj, -1); // -1 is bit-wise representation of all ones - } - // consumed entire validity buffer by here - - final long payloadRead = (long) nodeInfo.numElements * Long.BYTES; - Assert.geq(payloadBuffer, "payloadBuffer", payloadRead, "payloadRead"); - - if (options.useDeephavenNulls()) { - useDeephavenNulls(conversion, is, nodeInfo, chunk, outOffset); - } else { - useValidityBuffer(conversion, is, nodeInfo, chunk, outOffset, isValid); - } - - final long overhangPayload = payloadBuffer - payloadRead; - if (overhangPayload > 0) { - is.skipBytes(LongSizedDataStructure.intSize(DEBUG_NAME, overhangPayload)); - } - } - - return chunk; - } - - private static > T castOrCreateChunk( - final WritableChunk outChunk, - final int numRows, - final IntFunction chunkFactory, - final Function, T> castFunction) { - if (outChunk != null) { - return castFunction.apply(outChunk); - } - final T newChunk = chunkFactory.apply(numRows); - newChunk.setSize(numRows); - return newChunk; - } - - private static void useDeephavenNulls( - final LongConversion conversion, - final DataInput is, - final FieldNodeInfo nodeInfo, - final WritableLongChunk chunk, - final int offset) throws IOException { - if (conversion == LongConversion.IDENTITY) { - for (int ii = 0; ii < nodeInfo.numElements; ++ii) { - chunk.set(offset + ii, is.readLong()); - } - } else { - for (int ii = 0; ii < nodeInfo.numElements; ++ii) { - final long in = is.readLong(); - final long out = in == NULL_LONG ? in : conversion.apply(in); - chunk.set(offset + ii, out); - } - } - } - - private static void useValidityBuffer( - final LongConversion conversion, - final DataInput is, - final FieldNodeInfo nodeInfo, - final WritableLongChunk chunk, - final int offset, - final WritableLongChunk isValid) throws IOException { - final int numElements = nodeInfo.numElements; - final int numValidityWords = (numElements + 63) / 64; - - int ei = 0; - int pendingSkips = 0; - - for (int vi = 0; vi < numValidityWords; ++vi) { - int bitsLeftInThisWord = Math.min(64, numElements - vi * 64); - long validityWord = isValid.get(vi); - do { - if ((validityWord & 1) == 1) { - if (pendingSkips > 0) { - is.skipBytes(pendingSkips * Long.BYTES); - chunk.fillWithNullValue(offset + ei, pendingSkips); - ei += pendingSkips; - pendingSkips = 0; - } - chunk.set(offset + ei++, conversion.apply(is.readLong())); - validityWord >>= 1; - bitsLeftInThisWord--; - } else { - final int skips = Math.min(Long.numberOfTrailingZeros(validityWord), bitsLeftInThisWord); - pendingSkips += skips; - validityWord >>= skips; - bitsLeftInThisWord -= skips; - } - } while (bitsLeftInThisWord > 0); - } - - if (pendingSkips > 0) { - is.skipBytes(pendingSkips * Long.BYTES); - chunk.fillWithNullValue(offset + ei, pendingSkips); - } - } } diff --git a/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/chunk/LongChunkReader.java b/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/chunk/LongChunkReader.java new file mode 100644 index 00000000000..743e0a37c8f --- /dev/null +++ b/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/chunk/LongChunkReader.java @@ -0,0 +1,204 @@ +// +// Copyright (c) 2016-2024 Deephaven Data Labs and Patent Pending +// +// ****** AUTO-GENERATED CLASS - DO NOT EDIT MANUALLY +// ****** Edit CharChunkReader and run "./gradlew replicateBarrageUtils" to regenerate +// +// @formatter:off +package io.deephaven.extensions.barrage.chunk; + +import io.deephaven.base.verify.Assert; +import io.deephaven.chunk.WritableLongChunk; +import io.deephaven.chunk.WritableChunk; +import io.deephaven.chunk.WritableLongChunk; +import io.deephaven.chunk.WritableObjectChunk; +import io.deephaven.chunk.attributes.Values; +import io.deephaven.extensions.barrage.util.StreamReaderOptions; +import io.deephaven.util.datastructures.LongSizedDataStructure; + +import java.io.DataInput; +import java.io.IOException; +import java.util.Iterator; +import java.util.PrimitiveIterator; +import java.util.function.Function; +import java.util.function.IntFunction; + +import static io.deephaven.util.QueryConstants.NULL_LONG; + +public class LongChunkReader implements ChunkReader { + private static final String DEBUG_NAME = "LongChunkReader"; + private final StreamReaderOptions options; + private final LongConversion conversion; + + @FunctionalInterface + public interface LongConversion { + long apply(long in); + + LongConversion IDENTITY = (long a) -> a; + } + + public LongChunkReader(StreamReaderOptions options) { + this(options, LongConversion.IDENTITY); + } + + public LongChunkReader(StreamReaderOptions options, LongConversion conversion) { + this.options = options; + this.conversion = conversion; + } + + public ChunkReader transform(Function transform) { + return (fieldNodeIter, bufferInfoIter, is, outChunk, outOffset, totalRows) -> { + try (final WritableLongChunk inner = LongChunkReader.this.read( + fieldNodeIter, bufferInfoIter, is, null, 0, 0)) { + + final WritableObjectChunk chunk = castOrCreateChunk( + outChunk, + Math.max(totalRows, inner.size()), + WritableObjectChunk::makeWritableChunk, + WritableChunk::asWritableObjectChunk); + + if (outChunk == null) { + // if we're not given an output chunk then we better be writing at the front of the new one + Assert.eqZero(outOffset, "outOffset"); + } + + for (int ii = 0; ii < inner.size(); ++ii) { + long value = inner.get(ii); + chunk.set(outOffset + ii, transform.apply(value)); + } + + return chunk; + } + }; + } + + @Override + public WritableLongChunk read(Iterator fieldNodeIter, + PrimitiveIterator.OfLong bufferInfoIter, DataInput is, WritableChunk outChunk, int outOffset, + int totalRows) throws IOException { + + final ChunkInputStreamGenerator.FieldNodeInfo nodeInfo = fieldNodeIter.next(); + final long validityBuffer = bufferInfoIter.nextLong(); + final long payloadBuffer = bufferInfoIter.nextLong(); + + final WritableLongChunk chunk = castOrCreateChunk( + outChunk, + Math.max(totalRows, nodeInfo.numElements), + WritableLongChunk::makeWritableChunk, + WritableChunk::asWritableLongChunk); + + if (nodeInfo.numElements == 0) { + return chunk; + } + + final int numValidityLongs = options.useDeephavenNulls() ? 0 : (nodeInfo.numElements + 63) / 64; + try (final WritableLongChunk isValid = WritableLongChunk.makeWritableChunk(numValidityLongs)) { + if (options.useDeephavenNulls() && validityBuffer != 0) { + throw new IllegalStateException("validity buffer is non-empty, but is unnecessary"); + } + int jj = 0; + for (; jj < Math.min(numValidityLongs, validityBuffer / 8); ++jj) { + isValid.set(jj, is.readLong()); + } + final long valBufRead = jj * 8L; + if (valBufRead < validityBuffer) { + is.skipBytes(LongSizedDataStructure.intSize(DEBUG_NAME, validityBuffer - valBufRead)); + } + // we support short validity buffers + for (; jj < numValidityLongs; ++jj) { + isValid.set(jj, -1); // -1 is bit-wise representation of all ones + } + // consumed entire validity buffer by here + + final long payloadRead = (long) nodeInfo.numElements * Long.BYTES; + Assert.geq(payloadBuffer, "payloadBuffer", payloadRead, "payloadRead"); + + if (options.useDeephavenNulls()) { + useDeephavenNulls(conversion, is, nodeInfo, chunk, outOffset); + } else { + useValidityBuffer(conversion, is, nodeInfo, chunk, outOffset, isValid); + } + + final long overhangPayload = payloadBuffer - payloadRead; + if (overhangPayload > 0) { + is.skipBytes(LongSizedDataStructure.intSize(DEBUG_NAME, overhangPayload)); + } + } + + return chunk; + } + + private static > T castOrCreateChunk( + final WritableChunk outChunk, + final int numRows, + final IntFunction chunkFactory, + final Function, T> castFunction) { + if (outChunk != null) { + return castFunction.apply(outChunk); + } + final T newChunk = chunkFactory.apply(numRows); + newChunk.setSize(numRows); + return newChunk; + } + + private static void useDeephavenNulls( + final LongConversion conversion, + final DataInput is, + final ChunkInputStreamGenerator.FieldNodeInfo nodeInfo, + final WritableLongChunk chunk, + final int offset) throws IOException { + if (conversion == LongConversion.IDENTITY) { + for (int ii = 0; ii < nodeInfo.numElements; ++ii) { + chunk.set(offset + ii, is.readLong()); + } + } else { + for (int ii = 0; ii < nodeInfo.numElements; ++ii) { + final long in = is.readLong(); + final long out = in == NULL_LONG ? in : conversion.apply(in); + chunk.set(offset + ii, out); + } + } + } + + private static void useValidityBuffer( + final LongConversion conversion, + final DataInput is, + final ChunkInputStreamGenerator.FieldNodeInfo nodeInfo, + final WritableLongChunk chunk, + final int offset, + final WritableLongChunk isValid) throws IOException { + final int numElements = nodeInfo.numElements; + final int numValidityWords = (numElements + 63) / 64; + + int ei = 0; + int pendingSkips = 0; + + for (int vi = 0; vi < numValidityWords; ++vi) { + int bitsLeftInThisWord = Math.min(64, numElements - vi * 64); + long validityWord = isValid.get(vi); + do { + if ((validityWord & 1) == 1) { + if (pendingSkips > 0) { + is.skipBytes(pendingSkips * Long.BYTES); + chunk.fillWithNullValue(offset + ei, pendingSkips); + ei += pendingSkips; + pendingSkips = 0; + } + chunk.set(offset + ei++, conversion.apply(is.readLong())); + validityWord >>= 1; + bitsLeftInThisWord--; + } else { + final int skips = Math.min(Long.numberOfTrailingZeros(validityWord), bitsLeftInThisWord); + pendingSkips += skips; + validityWord >>= skips; + bitsLeftInThisWord -= skips; + } + } while (bitsLeftInThisWord > 0); + } + + if (pendingSkips > 0) { + is.skipBytes(pendingSkips * Long.BYTES); + chunk.fillWithNullValue(offset + ei, pendingSkips); + } + } +} diff --git a/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/chunk/ShortChunkInputStreamGenerator.java b/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/chunk/ShortChunkInputStreamGenerator.java index 68a2ecf86b1..4fd81b47d03 100644 --- a/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/chunk/ShortChunkInputStreamGenerator.java +++ b/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/chunk/ShortChunkInputStreamGenerator.java @@ -7,10 +7,7 @@ // @formatter:off package io.deephaven.extensions.barrage.chunk; -import io.deephaven.base.verify.Assert; import io.deephaven.chunk.ObjectChunk; -import io.deephaven.chunk.WritableChunk; -import io.deephaven.chunk.WritableObjectChunk; import io.deephaven.chunk.attributes.Values; import io.deephaven.chunk.util.pools.PoolableChunk; import io.deephaven.engine.primitive.function.ToShortFunction; @@ -21,17 +18,11 @@ import io.deephaven.util.datastructures.LongSizedDataStructure; import io.deephaven.chunk.ShortChunk; import io.deephaven.chunk.WritableShortChunk; -import io.deephaven.chunk.WritableLongChunk; import io.deephaven.util.type.TypeUtils; import org.jetbrains.annotations.Nullable; -import java.io.DataInput; import java.io.IOException; import java.io.OutputStream; -import java.util.Iterator; -import java.util.PrimitiveIterator; -import java.util.function.Function; -import java.util.function.IntFunction; import static io.deephaven.util.QueryConstants.*; @@ -167,192 +158,4 @@ public int drainTo(final OutputStream outputStream) throws IOException { return LongSizedDataStructure.intSize("ShortChunkInputStreamGenerator", bytesWritten); } } - - @FunctionalInterface - public interface ShortConversion { - short apply(short in); - - ShortConversion IDENTITY = (short a) -> a; - } - - static WritableShortChunk extractChunkFromInputStream( - final StreamReaderOptions options, - final Iterator fieldNodeIter, - final PrimitiveIterator.OfLong bufferInfoIter, - final DataInput is, - final WritableChunk outChunk, - final int outOffset, - final int totalRows) throws IOException { - return extractChunkFromInputStreamWithConversion( - options, ShortConversion.IDENTITY, fieldNodeIter, bufferInfoIter, is, outChunk, outOffset, - totalRows); - } - - static WritableObjectChunk extractChunkFromInputStreamWithTransform( - final StreamReaderOptions options, - final Function transform, - final Iterator fieldNodeIter, - final PrimitiveIterator.OfLong bufferInfoIter, - final DataInput is, - final WritableChunk outChunk, - final int outOffset, - final int totalRows) throws IOException { - - try (final WritableShortChunk inner = extractChunkFromInputStream( - options, fieldNodeIter, bufferInfoIter, is, null, 0, 0)) { - - final WritableObjectChunk chunk = castOrCreateChunk( - outChunk, - Math.max(totalRows, inner.size()), - WritableObjectChunk::makeWritableChunk, - WritableChunk::asWritableObjectChunk); - - if (outChunk == null) { - // if we're not given an output chunk then we better be writing at the front of the new one - Assert.eqZero(outOffset, "outOffset"); - } - - for (int ii = 0; ii < inner.size(); ++ii) { - short value = inner.get(ii); - chunk.set(outOffset + ii, transform.apply(value)); - } - - return chunk; - } - } - - static WritableShortChunk extractChunkFromInputStreamWithConversion( - final StreamReaderOptions options, - final ShortConversion conversion, - final Iterator fieldNodeIter, - final PrimitiveIterator.OfLong bufferInfoIter, - final DataInput is, - final WritableChunk outChunk, - final int outOffset, - final int totalRows) throws IOException { - - final FieldNodeInfo nodeInfo = fieldNodeIter.next(); - final long validityBuffer = bufferInfoIter.nextLong(); - final long payloadBuffer = bufferInfoIter.nextLong(); - - final WritableShortChunk chunk = castOrCreateChunk( - outChunk, - Math.max(totalRows, nodeInfo.numElements), - WritableShortChunk::makeWritableChunk, - WritableChunk::asWritableShortChunk); - - if (nodeInfo.numElements == 0) { - return chunk; - } - - final int numValidityLongs = options.useDeephavenNulls() ? 0 : (nodeInfo.numElements + 63) / 64; - try (final WritableLongChunk isValid = WritableLongChunk.makeWritableChunk(numValidityLongs)) { - if (options.useDeephavenNulls() && validityBuffer != 0) { - throw new IllegalStateException("validity buffer is non-empty, but is unnecessary"); - } - int jj = 0; - for (; jj < Math.min(numValidityLongs, validityBuffer / 8); ++jj) { - isValid.set(jj, is.readLong()); - } - final long valBufRead = jj * 8L; - if (valBufRead < validityBuffer) { - is.skipBytes(LongSizedDataStructure.intSize(DEBUG_NAME, validityBuffer - valBufRead)); - } - // we support short validity buffers - for (; jj < numValidityLongs; ++jj) { - isValid.set(jj, -1); // -1 is bit-wise representation of all ones - } - // consumed entire validity buffer by here - - final long payloadRead = (long) nodeInfo.numElements * Short.BYTES; - Assert.geq(payloadBuffer, "payloadBuffer", payloadRead, "payloadRead"); - - if (options.useDeephavenNulls()) { - useDeephavenNulls(conversion, is, nodeInfo, chunk, outOffset); - } else { - useValidityBuffer(conversion, is, nodeInfo, chunk, outOffset, isValid); - } - - final long overhangPayload = payloadBuffer - payloadRead; - if (overhangPayload > 0) { - is.skipBytes(LongSizedDataStructure.intSize(DEBUG_NAME, overhangPayload)); - } - } - - return chunk; - } - - private static > T castOrCreateChunk( - final WritableChunk outChunk, - final int numRows, - final IntFunction chunkFactory, - final Function, T> castFunction) { - if (outChunk != null) { - return castFunction.apply(outChunk); - } - final T newChunk = chunkFactory.apply(numRows); - newChunk.setSize(numRows); - return newChunk; - } - - private static void useDeephavenNulls( - final ShortConversion conversion, - final DataInput is, - final FieldNodeInfo nodeInfo, - final WritableShortChunk chunk, - final int offset) throws IOException { - if (conversion == ShortConversion.IDENTITY) { - for (int ii = 0; ii < nodeInfo.numElements; ++ii) { - chunk.set(offset + ii, is.readShort()); - } - } else { - for (int ii = 0; ii < nodeInfo.numElements; ++ii) { - final short in = is.readShort(); - final short out = in == NULL_SHORT ? in : conversion.apply(in); - chunk.set(offset + ii, out); - } - } - } - - private static void useValidityBuffer( - final ShortConversion conversion, - final DataInput is, - final FieldNodeInfo nodeInfo, - final WritableShortChunk chunk, - final int offset, - final WritableLongChunk isValid) throws IOException { - final int numElements = nodeInfo.numElements; - final int numValidityWords = (numElements + 63) / 64; - - int ei = 0; - int pendingSkips = 0; - - for (int vi = 0; vi < numValidityWords; ++vi) { - int bitsLeftInThisWord = Math.min(64, numElements - vi * 64); - long validityWord = isValid.get(vi); - do { - if ((validityWord & 1) == 1) { - if (pendingSkips > 0) { - is.skipBytes(pendingSkips * Short.BYTES); - chunk.fillWithNullValue(offset + ei, pendingSkips); - ei += pendingSkips; - pendingSkips = 0; - } - chunk.set(offset + ei++, conversion.apply(is.readShort())); - validityWord >>= 1; - bitsLeftInThisWord--; - } else { - final int skips = Math.min(Long.numberOfTrailingZeros(validityWord), bitsLeftInThisWord); - pendingSkips += skips; - validityWord >>= skips; - bitsLeftInThisWord -= skips; - } - } while (bitsLeftInThisWord > 0); - } - - if (pendingSkips > 0) { - is.skipBytes(pendingSkips * Short.BYTES); - chunk.fillWithNullValue(offset + ei, pendingSkips); - } - } } diff --git a/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/chunk/ShortChunkReader.java b/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/chunk/ShortChunkReader.java new file mode 100644 index 00000000000..56c17c2c11f --- /dev/null +++ b/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/chunk/ShortChunkReader.java @@ -0,0 +1,204 @@ +// +// Copyright (c) 2016-2024 Deephaven Data Labs and Patent Pending +// +// ****** AUTO-GENERATED CLASS - DO NOT EDIT MANUALLY +// ****** Edit CharChunkReader and run "./gradlew replicateBarrageUtils" to regenerate +// +// @formatter:off +package io.deephaven.extensions.barrage.chunk; + +import io.deephaven.base.verify.Assert; +import io.deephaven.chunk.WritableShortChunk; +import io.deephaven.chunk.WritableChunk; +import io.deephaven.chunk.WritableLongChunk; +import io.deephaven.chunk.WritableObjectChunk; +import io.deephaven.chunk.attributes.Values; +import io.deephaven.extensions.barrage.util.StreamReaderOptions; +import io.deephaven.util.datastructures.LongSizedDataStructure; + +import java.io.DataInput; +import java.io.IOException; +import java.util.Iterator; +import java.util.PrimitiveIterator; +import java.util.function.Function; +import java.util.function.IntFunction; + +import static io.deephaven.util.QueryConstants.NULL_SHORT; + +public class ShortChunkReader implements ChunkReader { + private static final String DEBUG_NAME = "ShortChunkReader"; + private final StreamReaderOptions options; + private final ShortConversion conversion; + + @FunctionalInterface + public interface ShortConversion { + short apply(short in); + + ShortConversion IDENTITY = (short a) -> a; + } + + public ShortChunkReader(StreamReaderOptions options) { + this(options, ShortConversion.IDENTITY); + } + + public ShortChunkReader(StreamReaderOptions options, ShortConversion conversion) { + this.options = options; + this.conversion = conversion; + } + + public ChunkReader transform(Function transform) { + return (fieldNodeIter, bufferInfoIter, is, outChunk, outOffset, totalRows) -> { + try (final WritableShortChunk inner = ShortChunkReader.this.read( + fieldNodeIter, bufferInfoIter, is, null, 0, 0)) { + + final WritableObjectChunk chunk = castOrCreateChunk( + outChunk, + Math.max(totalRows, inner.size()), + WritableObjectChunk::makeWritableChunk, + WritableChunk::asWritableObjectChunk); + + if (outChunk == null) { + // if we're not given an output chunk then we better be writing at the front of the new one + Assert.eqZero(outOffset, "outOffset"); + } + + for (int ii = 0; ii < inner.size(); ++ii) { + short value = inner.get(ii); + chunk.set(outOffset + ii, transform.apply(value)); + } + + return chunk; + } + }; + } + + @Override + public WritableShortChunk read(Iterator fieldNodeIter, + PrimitiveIterator.OfLong bufferInfoIter, DataInput is, WritableChunk outChunk, int outOffset, + int totalRows) throws IOException { + + final ChunkInputStreamGenerator.FieldNodeInfo nodeInfo = fieldNodeIter.next(); + final long validityBuffer = bufferInfoIter.nextLong(); + final long payloadBuffer = bufferInfoIter.nextLong(); + + final WritableShortChunk chunk = castOrCreateChunk( + outChunk, + Math.max(totalRows, nodeInfo.numElements), + WritableShortChunk::makeWritableChunk, + WritableChunk::asWritableShortChunk); + + if (nodeInfo.numElements == 0) { + return chunk; + } + + final int numValidityLongs = options.useDeephavenNulls() ? 0 : (nodeInfo.numElements + 63) / 64; + try (final WritableLongChunk isValid = WritableLongChunk.makeWritableChunk(numValidityLongs)) { + if (options.useDeephavenNulls() && validityBuffer != 0) { + throw new IllegalStateException("validity buffer is non-empty, but is unnecessary"); + } + int jj = 0; + for (; jj < Math.min(numValidityLongs, validityBuffer / 8); ++jj) { + isValid.set(jj, is.readLong()); + } + final long valBufRead = jj * 8L; + if (valBufRead < validityBuffer) { + is.skipBytes(LongSizedDataStructure.intSize(DEBUG_NAME, validityBuffer - valBufRead)); + } + // we support short validity buffers + for (; jj < numValidityLongs; ++jj) { + isValid.set(jj, -1); // -1 is bit-wise representation of all ones + } + // consumed entire validity buffer by here + + final long payloadRead = (long) nodeInfo.numElements * Short.BYTES; + Assert.geq(payloadBuffer, "payloadBuffer", payloadRead, "payloadRead"); + + if (options.useDeephavenNulls()) { + useDeephavenNulls(conversion, is, nodeInfo, chunk, outOffset); + } else { + useValidityBuffer(conversion, is, nodeInfo, chunk, outOffset, isValid); + } + + final long overhangPayload = payloadBuffer - payloadRead; + if (overhangPayload > 0) { + is.skipBytes(LongSizedDataStructure.intSize(DEBUG_NAME, overhangPayload)); + } + } + + return chunk; + } + + private static > T castOrCreateChunk( + final WritableChunk outChunk, + final int numRows, + final IntFunction chunkFactory, + final Function, T> castFunction) { + if (outChunk != null) { + return castFunction.apply(outChunk); + } + final T newChunk = chunkFactory.apply(numRows); + newChunk.setSize(numRows); + return newChunk; + } + + private static void useDeephavenNulls( + final ShortConversion conversion, + final DataInput is, + final ChunkInputStreamGenerator.FieldNodeInfo nodeInfo, + final WritableShortChunk chunk, + final int offset) throws IOException { + if (conversion == ShortConversion.IDENTITY) { + for (int ii = 0; ii < nodeInfo.numElements; ++ii) { + chunk.set(offset + ii, is.readShort()); + } + } else { + for (int ii = 0; ii < nodeInfo.numElements; ++ii) { + final short in = is.readShort(); + final short out = in == NULL_SHORT ? in : conversion.apply(in); + chunk.set(offset + ii, out); + } + } + } + + private static void useValidityBuffer( + final ShortConversion conversion, + final DataInput is, + final ChunkInputStreamGenerator.FieldNodeInfo nodeInfo, + final WritableShortChunk chunk, + final int offset, + final WritableLongChunk isValid) throws IOException { + final int numElements = nodeInfo.numElements; + final int numValidityWords = (numElements + 63) / 64; + + int ei = 0; + int pendingSkips = 0; + + for (int vi = 0; vi < numValidityWords; ++vi) { + int bitsLeftInThisWord = Math.min(64, numElements - vi * 64); + long validityWord = isValid.get(vi); + do { + if ((validityWord & 1) == 1) { + if (pendingSkips > 0) { + is.skipBytes(pendingSkips * Short.BYTES); + chunk.fillWithNullValue(offset + ei, pendingSkips); + ei += pendingSkips; + pendingSkips = 0; + } + chunk.set(offset + ei++, conversion.apply(is.readShort())); + validityWord >>= 1; + bitsLeftInThisWord--; + } else { + final int skips = Math.min(Long.numberOfTrailingZeros(validityWord), bitsLeftInThisWord); + pendingSkips += skips; + validityWord >>= skips; + bitsLeftInThisWord -= skips; + } + } while (bitsLeftInThisWord > 0); + } + + if (pendingSkips > 0) { + is.skipBytes(pendingSkips * Short.BYTES); + chunk.fillWithNullValue(offset + ei, pendingSkips); + } + } +} diff --git a/replication/static/src/main/java/io/deephaven/replicators/ReplicateBarrageUtils.java b/replication/static/src/main/java/io/deephaven/replicators/ReplicateBarrageUtils.java index 63670ad93d7..6824f8d91f9 100644 --- a/replication/static/src/main/java/io/deephaven/replicators/ReplicateBarrageUtils.java +++ b/replication/static/src/main/java/io/deephaven/replicators/ReplicateBarrageUtils.java @@ -25,6 +25,9 @@ public static void main(final String[] args) throws IOException { fixupChunkInputStreamGen(CHUNK_PACKAGE + "/LongChunkInputStreamGenerator.java", "Long"); fixupChunkInputStreamGen(CHUNK_PACKAGE + "/DoubleChunkInputStreamGenerator.java", "Double"); + ReplicatePrimitiveCode.charToAllButBoolean("replicateBarrageUtils", + CHUNK_PACKAGE + "/CharChunkReader.java"); + ReplicatePrimitiveCode.charToAllButBoolean("replicateBarrageUtils", CHUNK_PACKAGE + "/array/CharArrayExpansionKernel.java");