Skip to content

Commit

Permalink
feat: adding safe handlers and tests
Browse files Browse the repository at this point in the history
  • Loading branch information
vibhatha committed Mar 6, 2024
1 parent b7fa725 commit 4526423
Show file tree
Hide file tree
Showing 4 changed files with 98 additions and 43 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -41,11 +41,11 @@

/**
* BaseVariableWidthVector is a base class providing functionality for strings/bytes types.
* TODO: Update the interface for N buffers
*
*/
public abstract class BaseVariableWidthViewVector extends BaseValueVector
implements VariableWidthVector, FieldVector, VectorDefinitionSetter {
private static final int DEFAULT_RECORD_BYTE_COUNT = 8;
private static final int DEFAULT_RECORD_BYTE_COUNT = 16;
private static final int INITIAL_BYTE_COUNT = INITIAL_VALUE_ALLOCATION * DEFAULT_RECORD_BYTE_COUNT;
private static final int MAX_BUFFER_SIZE = (int) Math.min(MAX_ALLOCATION_SIZE, Integer.MAX_VALUE);
private int lastValueCapacity;
Expand Down Expand Up @@ -132,6 +132,7 @@ public ArrowBuf getValidityBuffer() {
*/
@Override
public ArrowBuf getDataBuffer() {
// TODO: fixme
return valueBuffer;
}

Expand Down Expand Up @@ -1189,9 +1190,7 @@ public void set(int index, ByteBuffer value, int start, int length) {
assert index >= 0;
fillHoles(index);
BitVectorHelper.setBit(validityBuffer, index);
final int startOffset = getStartOffset(index);
offsetBuffer.setInt((index + 1) * ((long) OFFSET_WIDTH), startOffset + length);
valueBuffer.setBytes(startOffset, value, start, length);
setBytes(index, value.array(), start, length);
lastSet = index;
}

Expand All @@ -1210,9 +1209,7 @@ public void setSafe(int index, ByteBuffer value, int start, int length) {
handleSafe(index, length);
fillHoles(index);
BitVectorHelper.setBit(validityBuffer, index);
final int startOffset = getStartOffset(index);
offsetBuffer.setInt((index + 1) * ((long) OFFSET_WIDTH), startOffset + length);
valueBuffer.setBytes(startOffset, value, start, length);
setBytes(index, value.array(), start, length);
lastSet = index;
}

Expand Down Expand Up @@ -1245,9 +1242,9 @@ public void set(int index, int isSet, int start, int end, ArrowBuf buffer) {
final int dataLength = end - start;
fillHoles(index);
BitVectorHelper.setValidityBit(validityBuffer, index, isSet);
final int startOffset = offsetBuffer.getInt((long) index * OFFSET_WIDTH);
offsetBuffer.setInt((index + 1) * ((long) OFFSET_WIDTH), startOffset + dataLength);
valueBuffer.setBytes(startOffset, buffer, start, dataLength);
byte[] data = new byte[dataLength];
buffer.getBytes(start, data, 0, dataLength);
setBytes(index, data, start, dataLength);
lastSet = index;
}

Expand All @@ -1268,9 +1265,9 @@ public void setSafe(int index, int isSet, int start, int end, ArrowBuf buffer) {
handleSafe(index, dataLength);
fillHoles(index);
BitVectorHelper.setValidityBit(validityBuffer, index, isSet);
final int startOffset = offsetBuffer.getInt((long) index * OFFSET_WIDTH);
offsetBuffer.setInt((long) (index + 1) * OFFSET_WIDTH, startOffset + dataLength);
valueBuffer.setBytes(startOffset, buffer, start, dataLength);
byte[] data = new byte[dataLength];
buffer.getBytes(start, data, 0, dataLength);
setBytes(index, data, start, dataLength);
lastSet = index;
}

Expand All @@ -1287,10 +1284,9 @@ public void set(int index, int start, int length, ArrowBuf buffer) {
assert index >= 0;
fillHoles(index);
BitVectorHelper.setBit(validityBuffer, index);
final int startOffset = offsetBuffer.getInt((long) index * OFFSET_WIDTH);
offsetBuffer.setInt((long) (index + 1) * OFFSET_WIDTH, startOffset + length);
final ArrowBuf bb = buffer.slice(start, length);
valueBuffer.setBytes(startOffset, bb);
byte[] data = new byte[length];
buffer.getBytes(start, data, 0, length);
setBytes(index, data, start, length);
lastSet = index;
}

Expand All @@ -1309,10 +1305,9 @@ public void setSafe(int index, int start, int length, ArrowBuf buffer) {
handleSafe(index, length);
fillHoles(index);
BitVectorHelper.setBit(validityBuffer, index);
final int startOffset = getStartOffset(index);
offsetBuffer.setInt((long) (index + 1) * OFFSET_WIDTH, startOffset + length);
final ArrowBuf bb = buffer.slice(start, length);
valueBuffer.setBytes(startOffset, bb);
byte[] data = new byte[length];
buffer.getBytes(start, data, 0, length);
setBytes(index, data, start, length);
lastSet = index;
}

Expand All @@ -1331,58 +1326,58 @@ protected final void fillHoles(int index) {
lastSet = index - 1;
}

protected void createValueBuffer(BufferAllocator allocator, byte[] value,
// TODO: rename to createViewBuffer
protected void createValueBuffer(BufferAllocator allocator, byte[] value, int start, int length,
ArrowBuf valueBuffer, List<ArrowBuf> dataBuffers) {
// TODO: handle value.length=0 case
if (value.length <= INLINE_SIZE) {
// inline buffer
// set length
valueBuffer.setInt(valueBuffer.writerIndex(), value.length);
valueBuffer.setInt(valueBuffer.writerIndex(), length);
// set data
valueBuffer.writerIndex(valueBuffer.writerIndex() + LENGTH_WIDTH);
valueBuffer.setBytes(valueBuffer.writerIndex(), value, 0, value.length);
valueBuffer.setBytes(valueBuffer.writerIndex(), value, start, length);
valueBuffer.writerIndex(valueBuffer.writerIndex() + INLINE_BUF_DATA_WIDTH);
} else {
// reference buffer
if (dataBuffers.isEmpty()) {
// first data buffer needs to be added
ArrowBuf newDataBuf = allocator.buffer(lastValueAllocationSizeInBytes);
// set length
valueBuffer.setInt(valueBuffer.writerIndex(), value.length);
valueBuffer.setInt(valueBuffer.writerIndex(), length);
valueBuffer.writerIndex(valueBuffer.writerIndex() + LENGTH_WIDTH);
// set prefix
valueBuffer.setBytes(valueBuffer.writerIndex(), value, 0, PREFIX_WIDTH);
valueBuffer.setBytes(valueBuffer.writerIndex(), value, start, PREFIX_WIDTH);
valueBuffer.writerIndex(valueBuffer.writerIndex() + PREFIX_WIDTH);
// set buf id
valueBuffer.setInt(valueBuffer.writerIndex(), /*first buffer*/0);
valueBuffer.writerIndex(valueBuffer.writerIndex() + BUF_INDEX_WIDTH);
// set offset
valueBuffer.setInt(valueBuffer.writerIndex(), 0);
valueBuffer.writerIndex(valueBuffer.writerIndex() + BUF_OFFSET_WIDTH);
newDataBuf.setBytes(0, value, 0, value.length);
newDataBuf.writerIndex(value.length);
newDataBuf.setBytes(0, value, 0, length);
newDataBuf.writerIndex(length);
dataBuffers.add(newDataBuf);
} else {
// insert to the last buffer in the data buffers or allocate new if the last buffer isn't enough
// set length
valueBuffer.setInt(valueBuffer.writerIndex(), value.length);
valueBuffer.setInt(valueBuffer.writerIndex(), length);
valueBuffer.writerIndex(valueBuffer.writerIndex() + LENGTH_WIDTH);
// set prefix
valueBuffer.setBytes(valueBuffer.writerIndex(), value, 0, PREFIX_WIDTH);
valueBuffer.setBytes(valueBuffer.writerIndex(), value, start, PREFIX_WIDTH);
valueBuffer.writerIndex(valueBuffer.writerIndex() + PREFIX_WIDTH);
// set buf id
int currentBufId = dataBuffers.size() - 1;
ArrowBuf currentBuf = dataBuffers.get(currentBufId);
if (currentBuf.capacity() - currentBuf.writerIndex() >= value.length) {
if (currentBuf.capacity() - currentBuf.writerIndex() >= length) {
// current buffer is enough
// set buf index
valueBuffer.setInt(valueBuffer.writerIndex(), currentBufId);
valueBuffer.writerIndex(valueBuffer.writerIndex() + BUF_INDEX_WIDTH);
// set offset
valueBuffer.setInt(valueBuffer.writerIndex(), (int) currentBuf.writerIndex());
valueBuffer.writerIndex(valueBuffer.writerIndex() + BUF_OFFSET_WIDTH);
currentBuf.setBytes(currentBuf.writerIndex(), value, 0, value.length);
currentBuf.writerIndex(currentBuf.writerIndex() + value.length);
currentBuf.setBytes(currentBuf.writerIndex(), value, start, length);
currentBuf.writerIndex(currentBuf.writerIndex() + length);
dataBuffers.set(currentBufId, currentBuf); // is this needed?
} else {
// current buffer is not enough
Expand All @@ -1394,8 +1389,8 @@ protected void createValueBuffer(BufferAllocator allocator, byte[] value,
// set offset
valueBuffer.setInt(valueBuffer.writerIndex(), 0);
valueBuffer.writerIndex(valueBuffer.writerIndex() + BUF_OFFSET_WIDTH);
newBuf.setBytes(0, value, 0, value.length);
newBuf.writerIndex(newBuf.writerIndex() + value.length);
newBuf.setBytes(0, value, start, length);
newBuf.writerIndex(newBuf.writerIndex() + length);
dataBuffers.add(newBuf);
}
}
Expand All @@ -1412,7 +1407,7 @@ protected final void setBytes(int index, byte[] value, int start, int length) {
offsetBuffer.setInt((long) (index + 1) * OFFSET_WIDTH, startOffset + length);
/* store the var length data in value buffer */
/*check whether the buffer is inline or reference buffer*/
createValueBuffer(allocator, value, valueBuffer, dataBuffers);
createValueBuffer(allocator, value, start, length, valueBuffer, dataBuffers);
}

public final int getStartOffset(int index) {
Expand Down Expand Up @@ -1464,6 +1459,7 @@ protected final void handleSafe(int index, int dataLength) {
* @return array of bytes
*/
public static byte[] get(final ArrowBuf data, final ArrowBuf offset, int index) {
// TODO: fixme
final int currentStartOffset = offset.getInt((long) index * OFFSET_WIDTH);
final int dataLength =
offset.getInt((long) (index + 1) * OFFSET_WIDTH) - currentStartOffset;
Expand All @@ -1488,6 +1484,7 @@ public static byte[] get(final ArrowBuf data, final ArrowBuf offset, int index)
*/
public static ArrowBuf set(ArrowBuf buffer, BufferAllocator allocator,
int valueCount, int index, int value) {
// TODO: fixme
if (buffer == null) {
buffer = allocator.buffer((long) valueCount * OFFSET_WIDTH);
}
Expand All @@ -1509,6 +1506,7 @@ public static ArrowBuf set(ArrowBuf buffer, BufferAllocator allocator,
*/
@Override
public void copyFrom(int fromIndex, int thisIndex, ValueVector from) {
// TODO: fixme
Preconditions.checkArgument(this.getMinorType() == from.getMinorType());
if (from.isNull(fromIndex)) {
fillHoles(thisIndex);
Expand Down Expand Up @@ -1539,6 +1537,7 @@ public void copyFrom(int fromIndex, int thisIndex, ValueVector from) {
*/
@Override
public void copyFromSafe(int fromIndex, int thisIndex, ValueVector from) {
// TODO: fixme
Preconditions.checkArgument(this.getMinorType() == from.getMinorType());
if (from.isNull(fromIndex)) {
handleSafe(thisIndex, 0);
Expand All @@ -1562,11 +1561,13 @@ public void copyFromSafe(int fromIndex, int thisIndex, ValueVector from) {

@Override
public ArrowBufPointer getDataPointer(int index) {
// TODO: fixme
return getDataPointer(index, new ArrowBufPointer());
}

@Override
public ArrowBufPointer getDataPointer(int index, ArrowBufPointer reuse) {
// TODO: fixme
if (isNull(index)) {
reuse.set(null, 0, 0);
} else {
Expand All @@ -1584,6 +1585,7 @@ public int hashCode(int index) {

@Override
public int hashCode(int index, ArrowBufHasher hasher) {
// TODO: fixme
if (isNull(index)) {
return ArrowBufPointer.NULL_HASH_CODE;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -228,6 +228,7 @@ public void get(int index, NullableViewVarCharHolder holder) {
* @param holder holder that carries data buffer.
*/
public void set(int index, ViewVarCharHolder holder) {
// TODO: fix this
assert index >= 0;
fillHoles(index);
BitVectorHelper.setBit(validityBuffer, index);
Expand All @@ -247,6 +248,7 @@ public void set(int index, ViewVarCharHolder holder) {
* @param holder holder that carries data buffer.
*/
public void setSafe(int index, ViewVarCharHolder holder) {
// TODO: fix this
assert index >= 0;
final int dataLength = holder.end - holder.start;
handleSafe(index, dataLength);
Expand Down Expand Up @@ -298,9 +300,6 @@ public void setSafe(int index, NullableViewVarCharHolder holder) {
final int dataLength = holder.end - holder.start;
handleSafe(index, dataLength);
fillHoles(index);
// final int startOffset = getStartOffset(index);
// offsetBuffer.setInt((index + 1) * ((long) OFFSET_WIDTH), startOffset + dataLength);
// valueBuffer.setBytes(startOffset, holder.buffer, holder.start, dataLength);
byte[] data = new byte[dataLength];
holder.buffer.getBytes(holder.start, data, 0, dataLength);
setBytes(index, data, holder.start, dataLength);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,11 @@ public static VarCharVector newVarCharVector(String name, BufferAllocator alloca
FieldType.nullable(new ArrowType.Utf8()).createNewSingleVector(name, allocator, null);
}

public static ViewVarCharVector newViewVarCharVector(String name, BufferAllocator allocator) {
return (ViewVarCharVector)
FieldType.nullable(new ArrowType.Utf8View()).createNewSingleVector(name, allocator, null);
}

public static VarBinaryVector newVarBinaryVector(String name, BufferAllocator allocator) {
return (VarBinaryVector)
FieldType.nullable(new ArrowType.Binary()).createNewSingleVector(name, allocator, null);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import static org.apache.arrow.vector.TestUtils.newVarBinaryVector;
import static org.apache.arrow.vector.TestUtils.newVarCharVector;
import static org.apache.arrow.vector.TestUtils.newVector;
import static org.apache.arrow.vector.TestUtils.newViewVarCharVector;
import static org.apache.arrow.vector.testing.ValueVectorDataPopulator.setVector;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
Expand Down Expand Up @@ -316,7 +317,7 @@ public void testFixedType2() {
}
}

@Test /* VarCharVector */
@Test /* VarCharVector and ViewVarCharVector */
public void testSizeOfValueBuffer() {
try (final VarCharVector vector = new VarCharVector(EMPTY_SCHEMA_PATH, allocator)) {
int valueCount = 100;
Expand All @@ -331,6 +332,20 @@ public void testSizeOfValueBuffer() {

assertEquals(currentSize, vector.sizeOfValueBuffer());
}

try (final ViewVarCharVector vector = new ViewVarCharVector(EMPTY_SCHEMA_PATH, allocator)) {
int valueCount = 3;
int currentSize = 0;
vector.setInitialCapacity(valueCount);
vector.allocateNew();
vector.setValueCount(valueCount);
for (int i = 0; i < valueCount; i++) {
currentSize += i;
vector.setSafe(i, new byte[i]);
}

assertEquals(currentSize, vector.sizeOfValueBuffer());
}
}

@Test
Expand Down Expand Up @@ -1264,10 +1279,11 @@ public void testSplitAndTransfer4() {
}
}

@Test /* VarCharVector */
@Test /* VarCharVector and ViewVarCharVector */
public void testNullableVarType1() {

// Create a new value vector for 1024 integers.
// VarCharVector
try (final VarCharVector vector = newVarCharVector(EMPTY_SCHEMA_PATH, allocator)) {
vector.allocateNew(1024 * 10, 1024);

Expand Down Expand Up @@ -1299,6 +1315,39 @@ public void testNullableVarType1() {
// Ensure null value throws.
assertNull(vector.get(8));
}

// ViewVarCharVector
try (final ViewVarCharVector vector = newViewVarCharVector(EMPTY_SCHEMA_PATH, allocator)) {
vector.allocateNew(1024 * 10, 1024);

vector.set(0, STR1);
vector.set(1, STR2);
vector.set(2, STR3);
vector.setSafe(3, STR3, 1, STR3.length - 1);
vector.setSafe(4, STR3, 2, STR3.length - 2);
ByteBuffer str3ByteBuffer = ByteBuffer.wrap(STR3);
vector.setSafe(5, str3ByteBuffer, 1, STR3.length - 1);
vector.setSafe(6, str3ByteBuffer, 2, STR3.length - 2);

// Set with convenience function
Text txt = new Text("foo");
vector.setSafe(7, txt);

// Check the sample strings.
assertArrayEquals(STR1, vector.get(0));
assertArrayEquals(STR2, vector.get(1));
assertArrayEquals(STR3, vector.get(2));
assertArrayEquals(Arrays.copyOfRange(STR3, 1, STR3.length), vector.get(3));
assertArrayEquals(Arrays.copyOfRange(STR3, 2, STR3.length), vector.get(4));
assertArrayEquals(Arrays.copyOfRange(STR3, 1, STR3.length), vector.get(5));
assertArrayEquals(Arrays.copyOfRange(STR3, 2, STR3.length), vector.get(6));

// Check returning a Text object
assertEquals(txt, vector.getObject(7));

// Ensure null value throws.
assertNull(vector.get(8));
}
}

@Test
Expand Down

0 comments on commit 4526423

Please sign in to comment.