Skip to content

Commit

Permalink
Commit #5, also boolean chunk reader
Browse files Browse the repository at this point in the history
  • Loading branch information
niloc132 committed Jun 27, 2024
1 parent cd2039f commit 5d51345
Show file tree
Hide file tree
Showing 3 changed files with 142 additions and 129 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -153,130 +153,4 @@ public int drainTo(final OutputStream outputStream) throws IOException {
return LongSizedDataStructure.intSize(DEBUG_NAME, bytesWritten);
}
}

@FunctionalInterface
public interface ByteConversion {
byte apply(byte in);

ByteConversion IDENTITY = (byte a) -> a;
}

static WritableChunk<Values> extractChunkFromInputStream(
final StreamReaderOptions options,
final Iterator<FieldNodeInfo> fieldNodeIter,
final PrimitiveIterator.OfLong bufferInfoIter,
final DataInput is,
final WritableChunk<Values> outChunk,
final int outOffset,
final int totalRows) throws IOException {
return extractChunkFromInputStreamWithConversion(
options, ByteConversion.IDENTITY, fieldNodeIter, bufferInfoIter, is, outChunk, outOffset, totalRows);
}

static WritableChunk<Values> extractChunkFromInputStreamWithConversion(
final StreamReaderOptions options,
final ByteConversion conversion,
final Iterator<FieldNodeInfo> fieldNodeIter,
final PrimitiveIterator.OfLong bufferInfoIter,
final DataInput is,
final WritableChunk<Values> outChunk,
final int outOffset,
final int totalRows) throws IOException {

final FieldNodeInfo nodeInfo = fieldNodeIter.next();
final long validityBuffer = bufferInfoIter.nextLong();
final long payloadBuffer = bufferInfoIter.nextLong();

final WritableByteChunk<Values> chunk;
if (outChunk != null) {
chunk = outChunk.asWritableByteChunk();
} else {
final int numRows = Math.max(totalRows, nodeInfo.numElements);
chunk = WritableByteChunk.makeWritableChunk(numRows);
chunk.setSize(numRows);
}

if (nodeInfo.numElements == 0) {
return chunk;
}

final int numValidityLongs = (nodeInfo.numElements + 63) / 64;
try (final WritableLongChunk<Values> isValid = WritableLongChunk.makeWritableChunk(numValidityLongs)) {
int jj = 0;
for (; jj < Math.min(numValidityLongs, validityBuffer / 8); ++jj) {
isValid.set(jj, is.readLong());
}
final long valBufRead = jj * 8L;
if (valBufRead < validityBuffer) {
is.skipBytes(LongSizedDataStructure.intSize(DEBUG_NAME, validityBuffer - valBufRead));
}
// we support short validity buffers
for (; jj < numValidityLongs; ++jj) {
isValid.set(jj, -1); // -1 is bit-wise representation of all ones
}
// consumed entire validity buffer by here

final int numPayloadBytesNeeded = (int) ((nodeInfo.numElements + 7L) / 8L);
if (payloadBuffer < numPayloadBytesNeeded) {
throw new IllegalStateException("payload buffer is too short for expected number of elements");
}

// cannot use deephaven nulls as booleans are not nullable
useValidityBuffer(conversion, is, nodeInfo, chunk, outOffset, isValid);

// flight requires that the payload buffer be padded to multiples of 8 bytes
final long payloadRead = getNumLongsForBitPackOfSize(nodeInfo.numElements) * 8L;
final long overhangPayload = payloadBuffer - payloadRead;
if (overhangPayload > 0) {
is.skipBytes(LongSizedDataStructure.intSize(DEBUG_NAME, overhangPayload));
}
}

return chunk;
}

private static void useValidityBuffer(
final ByteConversion conversion,
final DataInput is,
final FieldNodeInfo nodeInfo,
final WritableByteChunk<Values> chunk,
final int offset,
final WritableLongChunk<Values> isValid) throws IOException {
final int numElements = nodeInfo.numElements;
final int numValidityWords = (numElements + 63) / 64;

int ei = 0;
int pendingSkips = 0;

for (int vi = 0; vi < numValidityWords; ++vi) {
int bitsLeftInThisWord = Math.min(64, numElements - vi * 64);
long validityWord = isValid.get(vi);
long payloadWord = is.readLong();
do {
if ((validityWord & 1) == 1) {
if (pendingSkips > 0) {
chunk.fillWithNullValue(offset + ei, pendingSkips);
ei += pendingSkips;
pendingSkips = 0;
}
final byte value = (payloadWord & 1) == 1 ? BooleanUtils.TRUE_BOOLEAN_AS_BYTE
: BooleanUtils.FALSE_BOOLEAN_AS_BYTE;
chunk.set(offset + ei++, conversion.apply(value));
validityWord >>= 1;
payloadWord >>= 1;
bitsLeftInThisWord--;
} else {
final int skips = Math.min(Long.numberOfTrailingZeros(validityWord), bitsLeftInThisWord);
pendingSkips += skips;
validityWord >>= skips;
payloadWord >>= skips;
bitsLeftInThisWord -= skips;
}
} while (bitsLeftInThisWord > 0);
}

if (pendingSkips > 0) {
chunk.fillWithNullValue(offset + ei, pendingSkips);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,141 @@
//
// Copyright (c) 2016-2024 Deephaven Data Labs and Patent Pending
//
package io.deephaven.extensions.barrage.chunk;

import io.deephaven.chunk.WritableByteChunk;
import io.deephaven.chunk.WritableChunk;
import io.deephaven.chunk.WritableLongChunk;
import io.deephaven.chunk.attributes.Values;
import io.deephaven.util.BooleanUtils;
import io.deephaven.util.datastructures.LongSizedDataStructure;

import java.io.DataInput;
import java.io.IOException;
import java.util.Iterator;
import java.util.PrimitiveIterator;

import static io.deephaven.extensions.barrage.chunk.BaseChunkInputStreamGenerator.getNumLongsForBitPackOfSize;

public class BooleanChunkReader implements ChunkReader {
private static final String DEBUG_NAME = "BooleanChunkReader";

@FunctionalInterface
public interface ByteConversion {
byte apply(byte in);

ByteConversion IDENTITY = (byte a) -> a;
}

private final ByteConversion conversion;

public BooleanChunkReader() {
this(ByteConversion.IDENTITY);
}

public BooleanChunkReader(ByteConversion conversion) {
this.conversion = conversion;
}

@Override
public WritableChunk<Values> read(Iterator<ChunkInputStreamGenerator.FieldNodeInfo> fieldNodeIter,
PrimitiveIterator.OfLong bufferInfoIter, DataInput is, WritableChunk<Values> outChunk, int outOffset,
int totalRows) throws IOException {
final ChunkInputStreamGenerator.FieldNodeInfo nodeInfo = fieldNodeIter.next();
final long validityBuffer = bufferInfoIter.nextLong();
final long payloadBuffer = bufferInfoIter.nextLong();

final WritableByteChunk<Values> chunk;
if (outChunk != null) {
chunk = outChunk.asWritableByteChunk();
} else {
final int numRows = Math.max(totalRows, nodeInfo.numElements);
chunk = WritableByteChunk.makeWritableChunk(numRows);
chunk.setSize(numRows);
}

if (nodeInfo.numElements == 0) {
return chunk;
}

final int numValidityLongs = (nodeInfo.numElements + 63) / 64;
try (final WritableLongChunk<Values> isValid = WritableLongChunk.makeWritableChunk(numValidityLongs)) {
int jj = 0;
for (; jj < Math.min(numValidityLongs, validityBuffer / 8); ++jj) {
isValid.set(jj, is.readLong());
}
final long valBufRead = jj * 8L;
if (valBufRead < validityBuffer) {
is.skipBytes(LongSizedDataStructure.intSize(DEBUG_NAME, validityBuffer - valBufRead));
}
// we support short validity buffers
for (; jj < numValidityLongs; ++jj) {
isValid.set(jj, -1); // -1 is bit-wise representation of all ones
}
// consumed entire validity buffer by here

final int numPayloadBytesNeeded = (int) ((nodeInfo.numElements + 7L) / 8L);
if (payloadBuffer < numPayloadBytesNeeded) {
throw new IllegalStateException("payload buffer is too short for expected number of elements");
}

// cannot use deephaven nulls as booleans are not nullable
useValidityBuffer(conversion, is, nodeInfo, chunk, outOffset, isValid);

// flight requires that the payload buffer be padded to multiples of 8 bytes
final long payloadRead = getNumLongsForBitPackOfSize(nodeInfo.numElements) * 8L;
final long overhangPayload = payloadBuffer - payloadRead;
if (overhangPayload > 0) {
is.skipBytes(LongSizedDataStructure.intSize(DEBUG_NAME, overhangPayload));
}
}

return chunk;
}


private static void useValidityBuffer(
final ByteConversion conversion,
final DataInput is,
final ChunkInputStreamGenerator.FieldNodeInfo nodeInfo,
final WritableByteChunk<Values> chunk,
final int offset,
final WritableLongChunk<Values> isValid) throws IOException {
final int numElements = nodeInfo.numElements;
final int numValidityWords = (numElements + 63) / 64;

int ei = 0;
int pendingSkips = 0;

for (int vi = 0; vi < numValidityWords; ++vi) {
int bitsLeftInThisWord = Math.min(64, numElements - vi * 64);
long validityWord = isValid.get(vi);
long payloadWord = is.readLong();
do {
if ((validityWord & 1) == 1) {
if (pendingSkips > 0) {
chunk.fillWithNullValue(offset + ei, pendingSkips);
ei += pendingSkips;
pendingSkips = 0;
}
final byte value = (payloadWord & 1) == 1 ? BooleanUtils.TRUE_BOOLEAN_AS_BYTE
: BooleanUtils.FALSE_BOOLEAN_AS_BYTE;
chunk.set(offset + ei++, conversion.apply(value));
validityWord >>= 1;
payloadWord >>= 1;
bitsLeftInThisWord--;
} else {
final int skips = Math.min(Long.numberOfTrailingZeros(validityWord), bitsLeftInThisWord);
pendingSkips += skips;
validityWord >>= skips;
payloadWord >>= skips;
bitsLeftInThisWord -= skips;
}
} while (bitsLeftInThisWord > 0);
}

if (pendingSkips > 0) {
chunk.fillWithNullValue(offset + ei, pendingSkips);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,7 @@ public ChunkReader extractChunkFromInputStream(StreamReaderOptions options, int
return new CharChunkReader(options);
case Byte:
if (typeInfo.type() == Boolean.class || typeInfo.type() == boolean.class) {
return (fieldNodeIter, bufferInfoIter, is, outChunk, outOffset,
totalRows) -> BooleanChunkInputStreamGenerator.extractChunkFromInputStream(
options, fieldNodeIter, bufferInfoIter, is, outChunk, outOffset, totalRows);
return new BooleanChunkReader();
}
return new ByteChunkReader(options);
case Short:
Expand Down

0 comments on commit 5d51345

Please sign in to comment.