From 7f1a9a4f019ce131c4780fe5fb7555f2c20f601e Mon Sep 17 00:00:00 2001 From: Devin Smith Date: Wed, 31 Jan 2024 10:56:44 -0800 Subject: [PATCH] Fix ReadableByteChannel.read usages (#5089) Fixes #5088 --- .../extensions/arrow/ArrowWrapperTools.java | 13 ++++--- .../parquet/base/ColumnPageReaderImpl.java | 3 +- .../parquet/base/ParquetFileReader.java | 5 +-- .../deephaven/parquet/base/util/Helpers.java | 35 ++++++++++++++----- 4 files changed, 39 insertions(+), 17 deletions(-) diff --git a/extensions/arrow/src/main/java/io/deephaven/extensions/arrow/ArrowWrapperTools.java b/extensions/arrow/src/main/java/io/deephaven/extensions/arrow/ArrowWrapperTools.java index 041cf9d5bbb..27d1870f2d1 100644 --- a/extensions/arrow/src/main/java/io/deephaven/extensions/arrow/ArrowWrapperTools.java +++ b/extensions/arrow/src/main/java/io/deephaven/extensions/arrow/ArrowWrapperTools.java @@ -157,11 +157,16 @@ public static QueryTable readFeather(@NotNull final String path) { channel.position(block.getOffset()); final ByteBuffer metadataBuf = ByteBuffer.wrap(rawMetadataBuf, 0, block.getMetadataLength()); - int numRead = channel.read(metadataBuf); - if (numRead != block.getMetadataLength()) { - throw new IOException("Unexpected end of input trying to read block " + ii + " of '" + path + "'"); + while (metadataBuf.hasRemaining()) { + final int read = channel.read(metadataBuf); + if (read == 0) { + throw new IllegalStateException("ReadableByteChannel.read returned 0"); + } + if (read == -1) { + throw new IOException( + "Unexpected end of input trying to read block " + ii + " of '" + path + "'"); + } } - metadataBuf.flip(); if (metadataBuf.getInt() == IPC_CONTINUATION_TOKEN) { // if the continuation token is present, skip the length diff --git a/extensions/parquet/base/src/main/java/io/deephaven/parquet/base/ColumnPageReaderImpl.java b/extensions/parquet/base/src/main/java/io/deephaven/parquet/base/ColumnPageReaderImpl.java index 89dd0b6ea3c..f84458e2d3a 100644 --- a/extensions/parquet/base/src/main/java/io/deephaven/parquet/base/ColumnPageReaderImpl.java +++ b/extensions/parquet/base/src/main/java/io/deephaven/parquet/base/ColumnPageReaderImpl.java @@ -144,8 +144,9 @@ private void ensurePageHeader(final SeekableByteChannel file) throws IOException boolean success; do { final ByteBuffer headerBuffer = ByteBuffer.allocate(maxHeader); - file.read(headerBuffer); + Helpers.readExact(file, headerBuffer); headerBuffer.flip(); + final ByteBufferInputStream bufferedIS = ByteBufferInputStream.wrap(headerBuffer); try { pageHeader = Util.readPageHeader(bufferedIS); diff --git a/extensions/parquet/base/src/main/java/io/deephaven/parquet/base/ParquetFileReader.java b/extensions/parquet/base/src/main/java/io/deephaven/parquet/base/ParquetFileReader.java index 2156bf2a299..5421d133a9b 100644 --- a/extensions/parquet/base/src/main/java/io/deephaven/parquet/base/ParquetFileReader.java +++ b/extensions/parquet/base/src/main/java/io/deephaven/parquet/base/ParquetFileReader.java @@ -183,10 +183,7 @@ private Set calculateColumnsWithDictionaryUsedOnEveryDataPage() { private int readIntLittleEndian(SeekableByteChannel f) throws IOException { ByteBuffer tempBuf = ByteBuffer.allocate(Integer.BYTES); tempBuf.order(ByteOrder.LITTLE_ENDIAN); - int read = f.read(tempBuf); - if (read != 4) { - throw new IOException("Expected four bytes, only read " + read); - } + Helpers.readExact(f, tempBuf); tempBuf.flip(); return tempBuf.getInt(); } diff --git a/extensions/parquet/base/src/main/java/io/deephaven/parquet/base/util/Helpers.java b/extensions/parquet/base/src/main/java/io/deephaven/parquet/base/util/Helpers.java index 9434bedf0da..18864ad2f96 100644 --- a/extensions/parquet/base/src/main/java/io/deephaven/parquet/base/util/Helpers.java +++ b/extensions/parquet/base/src/main/java/io/deephaven/parquet/base/util/Helpers.java @@ -15,22 +15,41 @@ public class Helpers { public static void readBytes(ReadableByteChannel f, byte[] buffer) throws IOException { - int read = f.read(ByteBuffer.wrap(buffer)); - if (read != buffer.length) { - throw new IOException("Expected for bytes, only read " + read + " while it expected " + buffer.length); - } + readExact(f, ByteBuffer.wrap(buffer)); } public static BytesInput readBytes(ReadableByteChannel f, int expected) throws IOException { ByteBuffer buffer = ByteBuffer.allocate(expected); - int read = f.read(buffer); - if (read != expected) { - throw new IOException("Expected for bytes, only read " + read + " while it expected " + expected); - } + readExact(f, buffer); buffer.flip(); return BytesInput.from(buffer); } + /** + * Reads exactly {@code dst.remaining()} bytes from the blocking {@code channel} into {@code dst}. It is required + * that {@code channel} is in blocking mode, and thus will always return a non-zero + * {@link ReadableByteChannel#read(ByteBuffer)}. + * + * @param channel the readable channel + * @param dst the destination buffer + * @throws IOException if an IO error occurs + */ + public static void readExact(ReadableByteChannel channel, ByteBuffer dst) throws IOException { + final int expected = dst.remaining(); + while (dst.hasRemaining()) { + final int read = channel.read(dst); + if (read == 0) { + throw new IllegalStateException( + "ReadableByteChannel.read returned 0. Either the caller has broken the contract and passed in a non-blocking channel, or the blocking channel implementation is incorrectly returning 0."); + } + if (read == -1) { + throw new EOFException( + String.format("Reached end-of-stream before completing, expected=%d, remaining=%d", expected, + dst.remaining())); + } + } + } + static int readUnsignedVarInt(ByteBuffer in) { int value = 0; int i = 0;