Skip to content

Commit

Permalink
Commit #4, replicate new chunk readers for primitives
Browse files Browse the repository at this point in the history
  • Loading branch information
niloc132 committed Jun 27, 2024
1 parent 117c94f commit cd2039f
Show file tree
Hide file tree
Showing 16 changed files with 1,440 additions and 1,417 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,7 @@
// @formatter:off
package io.deephaven.extensions.barrage.chunk;

import io.deephaven.base.verify.Assert;
import io.deephaven.chunk.ObjectChunk;
import io.deephaven.chunk.WritableChunk;
import io.deephaven.chunk.WritableObjectChunk;
import io.deephaven.chunk.attributes.Values;
import io.deephaven.chunk.util.pools.PoolableChunk;
import io.deephaven.engine.primitive.function.ToByteFunction;
Expand All @@ -21,17 +18,11 @@
import io.deephaven.util.datastructures.LongSizedDataStructure;
import io.deephaven.chunk.ByteChunk;
import io.deephaven.chunk.WritableByteChunk;
import io.deephaven.chunk.WritableLongChunk;
import io.deephaven.util.type.TypeUtils;
import org.jetbrains.annotations.Nullable;

import java.io.DataInput;
import java.io.IOException;
import java.io.OutputStream;
import java.util.Iterator;
import java.util.PrimitiveIterator;
import java.util.function.Function;
import java.util.function.IntFunction;

import static io.deephaven.util.QueryConstants.*;

Expand Down Expand Up @@ -167,192 +158,4 @@ public int drainTo(final OutputStream outputStream) throws IOException {
return LongSizedDataStructure.intSize("ByteChunkInputStreamGenerator", bytesWritten);
}
}

@FunctionalInterface
public interface ByteConversion {
byte apply(byte in);

ByteConversion IDENTITY = (byte a) -> a;
}

static WritableByteChunk<Values> extractChunkFromInputStream(
final StreamReaderOptions options,
final Iterator<FieldNodeInfo> fieldNodeIter,
final PrimitiveIterator.OfLong bufferInfoIter,
final DataInput is,
final WritableChunk<Values> outChunk,
final int outOffset,
final int totalRows) throws IOException {
return extractChunkFromInputStreamWithConversion(
options, ByteConversion.IDENTITY, fieldNodeIter, bufferInfoIter, is, outChunk, outOffset,
totalRows);
}

static <T> WritableObjectChunk<T, Values> extractChunkFromInputStreamWithTransform(
final StreamReaderOptions options,
final Function<Byte, T> transform,
final Iterator<FieldNodeInfo> fieldNodeIter,
final PrimitiveIterator.OfLong bufferInfoIter,
final DataInput is,
final WritableChunk<Values> outChunk,
final int outOffset,
final int totalRows) throws IOException {

try (final WritableByteChunk<Values> inner = extractChunkFromInputStream(
options, fieldNodeIter, bufferInfoIter, is, null, 0, 0)) {

final WritableObjectChunk<T, Values> chunk = castOrCreateChunk(
outChunk,
Math.max(totalRows, inner.size()),
WritableObjectChunk::makeWritableChunk,
WritableChunk::asWritableObjectChunk);

if (outChunk == null) {
// if we're not given an output chunk then we better be writing at the front of the new one
Assert.eqZero(outOffset, "outOffset");
}

for (int ii = 0; ii < inner.size(); ++ii) {
byte value = inner.get(ii);
chunk.set(outOffset + ii, transform.apply(value));
}

return chunk;
}
}

static WritableByteChunk<Values> extractChunkFromInputStreamWithConversion(
final StreamReaderOptions options,
final ByteConversion conversion,
final Iterator<FieldNodeInfo> fieldNodeIter,
final PrimitiveIterator.OfLong bufferInfoIter,
final DataInput is,
final WritableChunk<Values> outChunk,
final int outOffset,
final int totalRows) throws IOException {

final FieldNodeInfo nodeInfo = fieldNodeIter.next();
final long validityBuffer = bufferInfoIter.nextLong();
final long payloadBuffer = bufferInfoIter.nextLong();

final WritableByteChunk<Values> chunk = castOrCreateChunk(
outChunk,
Math.max(totalRows, nodeInfo.numElements),
WritableByteChunk::makeWritableChunk,
WritableChunk::asWritableByteChunk);

if (nodeInfo.numElements == 0) {
return chunk;
}

final int numValidityLongs = options.useDeephavenNulls() ? 0 : (nodeInfo.numElements + 63) / 64;
try (final WritableLongChunk<Values> isValid = WritableLongChunk.makeWritableChunk(numValidityLongs)) {
if (options.useDeephavenNulls() && validityBuffer != 0) {
throw new IllegalStateException("validity buffer is non-empty, but is unnecessary");
}
int jj = 0;
for (; jj < Math.min(numValidityLongs, validityBuffer / 8); ++jj) {
isValid.set(jj, is.readLong());
}
final long valBufRead = jj * 8L;
if (valBufRead < validityBuffer) {
is.skipBytes(LongSizedDataStructure.intSize(DEBUG_NAME, validityBuffer - valBufRead));
}
// we support short validity buffers
for (; jj < numValidityLongs; ++jj) {
isValid.set(jj, -1); // -1 is bit-wise representation of all ones
}
// consumed entire validity buffer by here

final long payloadRead = (long) nodeInfo.numElements * Byte.BYTES;
Assert.geq(payloadBuffer, "payloadBuffer", payloadRead, "payloadRead");

if (options.useDeephavenNulls()) {
useDeephavenNulls(conversion, is, nodeInfo, chunk, outOffset);
} else {
useValidityBuffer(conversion, is, nodeInfo, chunk, outOffset, isValid);
}

final long overhangPayload = payloadBuffer - payloadRead;
if (overhangPayload > 0) {
is.skipBytes(LongSizedDataStructure.intSize(DEBUG_NAME, overhangPayload));
}
}

return chunk;
}

private static <T extends WritableChunk<Values>> T castOrCreateChunk(
final WritableChunk<Values> outChunk,
final int numRows,
final IntFunction<T> chunkFactory,
final Function<WritableChunk<Values>, T> castFunction) {
if (outChunk != null) {
return castFunction.apply(outChunk);
}
final T newChunk = chunkFactory.apply(numRows);
newChunk.setSize(numRows);
return newChunk;
}

private static void useDeephavenNulls(
final ByteConversion conversion,
final DataInput is,
final FieldNodeInfo nodeInfo,
final WritableByteChunk<Values> chunk,
final int offset) throws IOException {
if (conversion == ByteConversion.IDENTITY) {
for (int ii = 0; ii < nodeInfo.numElements; ++ii) {
chunk.set(offset + ii, is.readByte());
}
} else {
for (int ii = 0; ii < nodeInfo.numElements; ++ii) {
final byte in = is.readByte();
final byte out = in == NULL_BYTE ? in : conversion.apply(in);
chunk.set(offset + ii, out);
}
}
}

private static void useValidityBuffer(
final ByteConversion conversion,
final DataInput is,
final FieldNodeInfo nodeInfo,
final WritableByteChunk<Values> chunk,
final int offset,
final WritableLongChunk<Values> isValid) throws IOException {
final int numElements = nodeInfo.numElements;
final int numValidityWords = (numElements + 63) / 64;

int ei = 0;
int pendingSkips = 0;

for (int vi = 0; vi < numValidityWords; ++vi) {
int bitsLeftInThisWord = Math.min(64, numElements - vi * 64);
long validityWord = isValid.get(vi);
do {
if ((validityWord & 1) == 1) {
if (pendingSkips > 0) {
is.skipBytes(pendingSkips * Byte.BYTES);
chunk.fillWithNullValue(offset + ei, pendingSkips);
ei += pendingSkips;
pendingSkips = 0;
}
chunk.set(offset + ei++, conversion.apply(is.readByte()));
validityWord >>= 1;
bitsLeftInThisWord--;
} else {
final int skips = Math.min(Long.numberOfTrailingZeros(validityWord), bitsLeftInThisWord);
pendingSkips += skips;
validityWord >>= skips;
bitsLeftInThisWord -= skips;
}
} while (bitsLeftInThisWord > 0);
}

if (pendingSkips > 0) {
is.skipBytes(pendingSkips * Byte.BYTES);
chunk.fillWithNullValue(offset + ei, pendingSkips);
}
}
}
Loading

0 comments on commit cd2039f

Please sign in to comment.