From ab51fb18a1c339f8f8411df4ff07bf33832a1a15 Mon Sep 17 00:00:00 2001 From: Vibhatha Abeykoon Date: Tue, 18 Jun 2024 13:44:15 +0530 Subject: [PATCH] fix: initial functional archery test --- .../arrow/tools/ArrowFileTestFixtures.java | 5 +- .../apache/arrow/tools/TestIntegration.java | 42 +--- .../vector/BaseVariableWidthViewVector.java | 3 +- .../arrow/vector/ipc/JsonFileReader.java | 194 ++++++++++-------- .../arrow/vector/ipc/JsonFileWriter.java | 89 ++------ 5 files changed, 134 insertions(+), 199 deletions(-) diff --git a/java/tools/src/test/java/org/apache/arrow/tools/ArrowFileTestFixtures.java b/java/tools/src/test/java/org/apache/arrow/tools/ArrowFileTestFixtures.java index 589f49ab91e7c..f00021964f819 100644 --- a/java/tools/src/test/java/org/apache/arrow/tools/ArrowFileTestFixtures.java +++ b/java/tools/src/test/java/org/apache/arrow/tools/ArrowFileTestFixtures.java @@ -16,6 +16,7 @@ */ package org.apache.arrow.tools; +import static org.junit.jupiter.api.Assertions.assertArrayEquals; import static org.junit.jupiter.api.Assertions.assertEquals; import java.io.File; @@ -153,7 +154,7 @@ static void write(FieldVector parent, File file) throws IOException { static void writeInput(File testInFile, BufferAllocator allocator) throws IOException { try (BufferAllocator vectorAllocator = - allocator.newChildAllocator("original vectors", 0, Integer.MAX_VALUE); + allocator.newChildAllocator("original vectors", 0, Integer.MAX_VALUE); NonNullableStructVector parent = NonNullableStructVector.empty("parent", vectorAllocator)) { writeData(COUNT, parent); write(parent.getChild("root"), testInFile); @@ -161,7 +162,7 @@ static void writeInput(File testInFile, BufferAllocator allocator) throws IOExce } static void writeVariableWidthViewInput(File testInFile, BufferAllocator allocator, int count) - throws FileNotFoundException, IOException { + throws IOException { try (BufferAllocator vectorAllocator = allocator.newChildAllocator("original view vectors", 0, Integer.MAX_VALUE); NonNullableStructVector parent = NonNullableStructVector.empty("parent", vectorAllocator)) { diff --git a/java/tools/src/test/java/org/apache/arrow/tools/TestIntegration.java b/java/tools/src/test/java/org/apache/arrow/tools/TestIntegration.java index fa892921bcbe8..2a49c76f7a49d 100644 --- a/java/tools/src/test/java/org/apache/arrow/tools/TestIntegration.java +++ b/java/tools/src/test/java/org/apache/arrow/tools/TestIntegration.java @@ -45,7 +45,6 @@ import org.apache.arrow.vector.complex.writer.BigIntWriter; import org.apache.arrow.vector.complex.writer.Float8Writer; import org.apache.arrow.vector.complex.writer.IntWriter; -import org.apache.commons.cli.ParseException; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -350,9 +349,9 @@ public void testInvalid() throws Exception { } @Test - public void testJSONRoundTripWithVariableWidthView() throws Exception { + public void testValidateVariableWidthView() throws Exception { final int valueCount = 256; - final int multiplier = 1; + final int multiplier = 2; for (int i = 1; i < multiplier; i++) { File testInFile = new File(testFolder, "testIn.arrow"); @@ -404,41 +403,4 @@ public void testJSONRoundTripWithVariableWidthView() throws Exception { integration.run(args3); } } - - @Test - public void t2() throws ParseException, IOException { - File testInFile = - new File( - "/home/vibhatha/Documents/Work/Apache_Arrow/json_readers/failures/java_producing.arrow"); - File testJSONFile = new File(testFolder, "testOut.json"); - testJSONFile.delete(); - File testOutFile = new File(testFolder, "testOut.arrow"); - testOutFile.delete(); - - System.out.println(testJSONFile.getAbsolutePath()); - - Integration integration = new Integration(); - - // convert it to json - String[] args1 = { - "-arrow", - testInFile.getAbsolutePath(), - "-json", - testJSONFile.getAbsolutePath(), - "-command", - Command.ARROW_TO_JSON.name() - }; - integration.run(args1); - - // convert back to arrow - String[] args2 = { - "-arrow", - testOutFile.getAbsolutePath(), - "-json", - testJSONFile.getAbsolutePath(), - "-command", - Command.JSON_TO_ARROW.name() - }; - integration.run(args2); - } } diff --git a/java/vector/src/main/java/org/apache/arrow/vector/BaseVariableWidthViewVector.java b/java/vector/src/main/java/org/apache/arrow/vector/BaseVariableWidthViewVector.java index 1970283076c88..aea492b7d0696 100644 --- a/java/vector/src/main/java/org/apache/arrow/vector/BaseVariableWidthViewVector.java +++ b/java/vector/src/main/java/org/apache/arrow/vector/BaseVariableWidthViewVector.java @@ -1761,8 +1761,7 @@ public void exportCDataBuffers(List buffers, ArrowBuf buffersPtr, long * @param index position of the element in the vector * @return array of bytes */ - public static byte[] get( - final ArrowBuf viewBuffer, final List dataBuffers, int index, boolean isView) { + public static byte[] get(final ArrowBuf viewBuffer, final List dataBuffers, int index) { final int dataLength = viewBuffer.getInt((long) index * ELEMENT_SIZE); byte[] result = new byte[dataLength]; if (dataLength > INLINE_SIZE) { diff --git a/java/vector/src/main/java/org/apache/arrow/vector/ipc/JsonFileReader.java b/java/vector/src/main/java/org/apache/arrow/vector/ipc/JsonFileReader.java index 0a84131507bce..997f199b677bd 100644 --- a/java/vector/src/main/java/org/apache/arrow/vector/ipc/JsonFileReader.java +++ b/java/vector/src/main/java/org/apache/arrow/vector/ipc/JsonFileReader.java @@ -25,6 +25,7 @@ import static org.apache.arrow.vector.BufferLayout.BufferType.TYPE; import static org.apache.arrow.vector.BufferLayout.BufferType.VALIDITY; import static org.apache.arrow.vector.BufferLayout.BufferType.VARIADIC_DATA_BUFFERS; +import static org.apache.arrow.vector.BufferLayout.BufferType.VIEWS; import com.fasterxml.jackson.core.JsonParseException; import com.fasterxml.jackson.core.JsonParser; @@ -201,9 +202,7 @@ public boolean read(VectorSchemaRoot root) throws IOException { readFromJsonIntoVector(field, vector); } } - System.out.println(">>1 start"); readToken(END_ARRAY); - System.out.println(">>1 done"); root.setRowCount(count); } readToken(END_OBJECT); @@ -232,9 +231,7 @@ public VectorSchemaRoot read() throws IOException { readFromJsonIntoVector(field, vector); } } - System.out.println(">>2 start"); readToken(END_ARRAY); - System.out.println(">>2 done"); } readToken(END_OBJECT); return recordBatch; @@ -279,15 +276,14 @@ ArrowBuf readBuffer(BufferAllocator allocator, int count) throws IOException { } private abstract class DataBufferReader { - protected abstract ArrowBuf read(BufferAllocator allocator, int count) throws IOException; + protected abstract List read(BufferAllocator allocator, int variadicBuffersCount) + throws IOException; - List readBuffer(BufferAllocator allocator, List variadicElementCounts) + List readBuffer(BufferAllocator allocator, int variadicBuffersCount) throws IOException { readToken(START_ARRAY); - ArrayList dataBuffers = new ArrayList<>(variadicElementCounts.size()); - for (int i = 0; i < variadicElementCounts.size(); i++) { - dataBuffers.add(read(allocator, variadicElementCounts.get(i))); - } + ArrayList dataBuffers = new ArrayList<>(variadicBuffersCount); + dataBuffers.addAll(read(allocator, variadicBuffersCount)); readToken(END_ARRAY); return dataBuffers; } @@ -638,12 +634,20 @@ protected ArrowBuf read(BufferAllocator allocator, int count) throws IOException }; } + class ViewBufferReader extends BufferReader { + + @Override + protected ArrowBuf read(BufferAllocator allocator, int count) throws IOException { + return null; + } + } + private ArrowBuf readViewIntoBuffer( BufferAllocator allocator, - BufferType bufferType, MinorType type, int count, - List dataBufferElementCounts) + List variadicBufferIndices, + List vectorTypes) throws IOException { BufferReader reader = new BufferReader() { @@ -654,21 +658,19 @@ private ArrowBuf readBinaryValues(BufferAllocator allocator, int count) long bufferSize = 0L; for (int i = 0; i < count; i++) { readToken(START_OBJECT); - byte[] value = new byte[BaseVariableWidthViewVector.ELEMENT_SIZE]; final int length = readNextField("SIZE", Integer.class); + byte[] value; if (length > BaseVariableWidthViewVector.INLINE_SIZE) { // PREFIX_HEX final byte[] prefix = decodeHexSafe(readNextField("PREFIX_HEX", String.class)); // BUFFER_INDEX final int bufferIndex = readNextField("BUFFER_INDEX", Integer.class); - if (dataBufferElementCounts.isEmpty()) { - dataBufferElementCounts.add(1); + if (variadicBufferIndices.isEmpty()) { + variadicBufferIndices.add(bufferIndex); } else { - if (bufferIndex < dataBufferElementCounts.size()) { - dataBufferElementCounts.set( - bufferIndex, dataBufferElementCounts.get(bufferIndex) + 1); - } else { - dataBufferElementCounts.add(bufferIndex, 1); + int lastBufferIndex = variadicBufferIndices.get(variadicBufferIndices.size() - 1); + if (lastBufferIndex != bufferIndex) { + variadicBufferIndices.add(bufferIndex); } } @@ -682,7 +684,6 @@ private ArrowBuf readBinaryValues(BufferAllocator allocator, int count) buffer.putInt(bufferIndex); // Write 'bufferIndex' to bytes 8-11 buffer.putInt(offset); // Write 'offset' to bytes 12-15 value = buffer.array(); // Convert the ByteBuffer to a byte array - } else { // in-line ByteBuffer buffer = @@ -714,6 +715,10 @@ private ArrowBuf readBinaryValues(BufferAllocator allocator, int count) buf.writeBytes(value); } + if (!variadicBufferIndices.isEmpty()) { + // we have variadic buffers + vectorTypes.add(VARIADIC_DATA_BUFFERS); + } return buf; } @@ -726,47 +731,33 @@ protected ArrowBuf read(BufferAllocator allocator, int count) throws IOException } private List readDataBufferIntoBuffer( - BufferAllocator allocator, MinorType type, List dataBufferElementCounts) - throws IOException { + BufferAllocator allocator, int variadicBufferCount) throws IOException { DataBufferReader reader = new DataBufferReader() { - private ArrowBuf readBinaryValues(BufferAllocator allocator, int count) + @Override + protected List read(BufferAllocator allocator, int variadicBuffersCount) throws IOException { - ArrayList values = new ArrayList<>(count); - long bufferSize = 0L; - - for (int i = 0; i < count; i++) { + List dataBuffers = new ArrayList<>(variadicBuffersCount); + for (int i = 0; i < variadicBuffersCount; i++) { parser.nextToken(); final byte[] value; - if (type == MinorType.VIEWVARCHAR) { - value = parser.getValueAsString().getBytes(StandardCharsets.UTF_8); + + String variadicStr = parser.readValueAs(String.class); + if (variadicStr == null) { + value = new byte[0]; } else { - String variadicStr = parser.readValueAs(String.class); - if (variadicStr == null) { - value = new byte[0]; - } else { - value = decodeHexSafe(variadicStr); - } + value = decodeHexSafe(variadicStr); } - values.add(value); - bufferSize += value.length; - } - - ArrowBuf buf = allocator.buffer(bufferSize); - for (byte[] value : values) { + ArrowBuf buf = allocator.buffer(value.length); buf.writeBytes(value); + dataBuffers.add(buf); } - return buf; - } - - @Override - protected ArrowBuf read(BufferAllocator allocator, int count) throws IOException { - return readBinaryValues(allocator, count); + return dataBuffers; } }; - return reader.readBuffer(allocator, dataBufferElementCounts); + return reader.readBuffer(allocator, variadicBufferCount); } private ArrowBuf readIntoBuffer( @@ -885,6 +876,10 @@ private ArrowBuf readIntoBuffer( default: throw new UnsupportedOperationException("Cannot read array of type " + type); } + } else if (bufferType.equals(VIEWS)) { + // no-op + } else if (bufferType.equals(VARIADIC_DATA_BUFFERS)) { + // no-op } else { throw new InvalidArrowFileException("Unrecognized buffer type " + bufferType); } @@ -895,6 +890,61 @@ private ArrowBuf readIntoBuffer( return buf; } + private void handleVector( + BufferAllocator allocator, + BufferType bufferType, + FieldVector vector, + int valueCount, + int innerBufferValueCount, + List vectorBuffers, + List vectorTypes, + int v, + List variadicBufferIndices) + throws IOException { + if (vector instanceof BaseVariableWidthViewVector) { + handleBaseVariableWidthViewVector( + allocator, + bufferType, + vector, + valueCount, + variadicBufferIndices, + vectorBuffers, + vectorTypes, + v); + } else { + vectorBuffers.add( + readIntoBuffer(allocator, bufferType, vector.getMinorType(), innerBufferValueCount)); + } + } + + private void handleBaseVariableWidthViewVector( + BufferAllocator allocator, + BufferType bufferType, + FieldVector vector, + int valueCount, + List variadicBufferIndices, + List vectorBuffers, + List vectorTypes, + int v) + throws IOException { + switch (v) { + case 0: + // read validity buffer + vectorBuffers.add(readIntoBuffer(allocator, bufferType, vector.getMinorType(), valueCount)); + break; + case 1: + // read view buffer + vectorBuffers.add( + readViewIntoBuffer( + allocator, vector.getMinorType(), valueCount, variadicBufferIndices, vectorTypes)); + break; + default: + // read data buffers + vectorBuffers.addAll(readDataBufferIntoBuffer(allocator, variadicBufferIndices.size())); + break; + } + } + private void readFromJsonIntoVector(Field field, FieldVector vector) throws JsonParseException, IOException { ArrowType type = field.getType(); @@ -902,8 +952,9 @@ private void readFromJsonIntoVector(Field field, FieldVector vector) TypeLayout typeLayout = TypeLayout.getTypeLayout(type); List vectorTypes = typeLayout.getBufferTypes(); List vectorBuffers = new ArrayList<>(vectorTypes.size()); + List variadicBufferIndices = new ArrayList<>(); /* - * The order of inner buffers is : + * The order of inner buffers is: * Fixed width vector: * -- validity buffer * -- data buffer @@ -917,7 +968,6 @@ private void readFromJsonIntoVector(Field field, FieldVector vector) * locally as we read from Json parser and do loadFieldBuffers on the vector followed by * releasing the local buffers. */ - List dataBufferElementCounts = new ArrayList<>(); readToken(START_OBJECT); { // If currently reading dictionaries, field name is not important so don't check @@ -943,34 +993,16 @@ private void readFromJsonIntoVector(Field field, FieldVector vector) innerBufferValueCount = valueCount + 1; } - if (vector instanceof BaseVariableWidthViewVector) { - if (v == 0) { - // read validity buffer - vectorBuffers.add( - readIntoBuffer(allocator, bufferType, vector.getMinorType(), valueCount)); - } else if (v == 1) { - // read view buffer - vectorBuffers.add( - readViewIntoBuffer( - allocator, - bufferType, - vector.getMinorType(), - valueCount, - dataBufferElementCounts)); - if (!dataBufferElementCounts.isEmpty()) { - // we have variadic buffers - vectorTypes.add(VARIADIC_DATA_BUFFERS); - } - } else { - // read data buffers - List variadicBuffers = - readDataBufferIntoBuffer(allocator, vector.getMinorType(), dataBufferElementCounts); - vectorBuffers.addAll(variadicBuffers); - } - } else { - vectorBuffers.add( - readIntoBuffer(allocator, bufferType, vector.getMinorType(), innerBufferValueCount)); - } + handleVector( + allocator, + bufferType, + vector, + valueCount, + innerBufferValueCount, + vectorBuffers, + vectorTypes, + v, + variadicBufferIndices); } int nullCount = 0; @@ -1003,13 +1035,11 @@ private void readFromJsonIntoVector(Field field, FieldVector vector) readToken(END_ARRAY); } } - if (dataBufferElementCounts.isEmpty() && vector instanceof BaseVariableWidthViewVector) { + if (variadicBufferIndices.isEmpty() && vector instanceof BaseVariableWidthViewVector) { // facilitating the reading of empty databuffer parser.nextToken(); readToken(START_ARRAY); - System.out.println(">>5 start!! "); readToken(END_ARRAY); - System.out.println(">>5 done!!"); } readToken(END_OBJECT); diff --git a/java/vector/src/main/java/org/apache/arrow/vector/ipc/JsonFileWriter.java b/java/vector/src/main/java/org/apache/arrow/vector/ipc/JsonFileWriter.java index 0b48133eb1d2d..792445731c519 100644 --- a/java/vector/src/main/java/org/apache/arrow/vector/ipc/JsonFileWriter.java +++ b/java/vector/src/main/java/org/apache/arrow/vector/ipc/JsonFileWriter.java @@ -246,16 +246,20 @@ private void writeFromVectorIntoJson(Field field, FieldVector vector) throws IOE && (vector.getMinorType() == MinorType.VIEWVARCHAR || vector.getMinorType() == MinorType.VIEWVARBINARY)) { // writing views - ArrowBuf viewBuffer = vectorBuffers.get(v); + ArrowBuf viewBuffer = vectorBuffers.get(1); List dataBuffers = vectorBuffers.subList(v + 1, vectorBuffers.size()); writeValueToViewGenerator(bufferType, viewBuffer, dataBuffers, vector, i); } else if (bufferType.equals(VARIADIC_DATA_BUFFERS) && (vector.getMinorType() == MinorType.VIEWVARCHAR || vector.getMinorType() == MinorType.VIEWVARBINARY)) { - ArrowBuf viewBuffer = vectorBuffers.get(1); + ArrowBuf viewBuffer = vectorBuffers.get(1); // check if this is v-1 List dataBuffers = vectorBuffers.subList(v, vectorBuffers.size()); if (!dataBuffers.isEmpty()) { - writeValueToDataBufferGenerator(bufferType, viewBuffer, dataBuffers, vector, i); + writeValueToDataBufferGenerator(bufferType, viewBuffer, dataBuffers, vector); + // The variadic buffers are written at once and doesn't require iterating for + // each index. + // So, break the loop. + break; } } else if (bufferType.equals(OFFSET) && vector.getValueCount() == 0 @@ -319,7 +323,7 @@ private void writeValueToViewGenerator( final int index) throws IOException { Preconditions.checkNotNull(viewBuffer); - byte[] b = (BaseVariableWidthViewVector.get(viewBuffer, dataBuffers, index, true)); + byte[] b = (BaseVariableWidthViewVector.get(viewBuffer, dataBuffers, index)); final int elementSize = BaseVariableWidthViewVector.ELEMENT_SIZE; final int lengthWidth = BaseVariableWidthViewVector.LENGTH_WIDTH; final int prefixWidth = BaseVariableWidthViewVector.PREFIX_WIDTH; @@ -355,80 +359,19 @@ private void writeValueToViewGenerator( } private void writeValueToDataBufferGenerator( - BufferType bufferType, - ArrowBuf viewBuffer, - List dataBuffers, - FieldVector vector, - final int index) + BufferType bufferType, ArrowBuf viewBuffer, List dataBuffers, FieldVector vector) throws IOException { if (bufferType.equals(VARIADIC_DATA_BUFFERS)) { Preconditions.checkNotNull(viewBuffer); Preconditions.checkArgument(!dataBuffers.isEmpty()); - byte[] b = BaseVariableWidthViewVector.get(viewBuffer, dataBuffers, index, false); - if (b.length > BaseVariableWidthViewVector.INLINE_SIZE) { - switch (vector.getMinorType()) { - case VIEWVARCHAR: - { - if (b != null) { - generator.writeString(new String(b, "UTF-8")); - } - break; - } - case VIEWVARBINARY: - { - if (b != null) { - String hexString = Hex.encodeHexString(b); - generator.writeObject(hexString); - } - break; - } - default: - throw new UnsupportedOperationException("minor type: " + vector.getMinorType()); - } - } - } - } - private void writeValueToVariadicGenerator( - BufferType bufferType, - ArrowBuf viewBuffer, - List dataBuffers, - FieldVector vector, - final int index, - boolean isView) - throws IOException { - if (bufferType.equals(DATA)) { - switch (vector.getMinorType()) { - case VIEWVARCHAR: - { - Preconditions.checkNotNull(viewBuffer); - byte[] b = (BaseVariableWidthViewVector.get(viewBuffer, dataBuffers, index, isView)); - if (isView) { - - generator.writeBinary(b); - } else { - if (b != null) { - generator.writeString(new String(b, "UTF-8")); - } - } - break; - } - case VIEWVARBINARY: - { - Preconditions.checkNotNull(viewBuffer); - byte[] b = BaseVariableWidthViewVector.get(viewBuffer, dataBuffers, index, isView); - if (isView) { - generator.writeBinary(b); - } else { - if (b != null) { - String hexString = Hex.encodeHexString(b); - generator.writeObject(hexString); - } - } - break; - } - default: - throw new UnsupportedOperationException("minor type: " + vector.getMinorType()); + for (int i = 0; i < dataBuffers.size(); i++) { + ArrowBuf dataBuf = dataBuffers.get(i); + byte[] result = new byte[(int) dataBuf.writerIndex()]; + dataBuf.getBytes(0, result); + if (result != null) { + generator.writeString(Hex.encodeHexString(result)); + } } } }