Skip to content

Commit

Permalink
Fix ReadableByteChannel.read usages (#5089)
Browse files Browse the repository at this point in the history
Fixes #5088
  • Loading branch information
devinrsmith committed Jan 31, 2024
1 parent 17a1162 commit 7f1a9a4
Show file tree
Hide file tree
Showing 4 changed files with 39 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -157,11 +157,16 @@ public static QueryTable readFeather(@NotNull final String path) {
channel.position(block.getOffset());

final ByteBuffer metadataBuf = ByteBuffer.wrap(rawMetadataBuf, 0, block.getMetadataLength());
int numRead = channel.read(metadataBuf);
if (numRead != block.getMetadataLength()) {
throw new IOException("Unexpected end of input trying to read block " + ii + " of '" + path + "'");
while (metadataBuf.hasRemaining()) {
final int read = channel.read(metadataBuf);
if (read == 0) {
throw new IllegalStateException("ReadableByteChannel.read returned 0");
}
if (read == -1) {
throw new IOException(
"Unexpected end of input trying to read block " + ii + " of '" + path + "'");
}
}

metadataBuf.flip();
if (metadataBuf.getInt() == IPC_CONTINUATION_TOKEN) {
// if the continuation token is present, skip the length
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -144,8 +144,9 @@ private void ensurePageHeader(final SeekableByteChannel file) throws IOException
boolean success;
do {
final ByteBuffer headerBuffer = ByteBuffer.allocate(maxHeader);
file.read(headerBuffer);
Helpers.readExact(file, headerBuffer);
headerBuffer.flip();

final ByteBufferInputStream bufferedIS = ByteBufferInputStream.wrap(headerBuffer);
try {
pageHeader = Util.readPageHeader(bufferedIS);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -183,10 +183,7 @@ private Set<String> calculateColumnsWithDictionaryUsedOnEveryDataPage() {
private int readIntLittleEndian(SeekableByteChannel f) throws IOException {
ByteBuffer tempBuf = ByteBuffer.allocate(Integer.BYTES);
tempBuf.order(ByteOrder.LITTLE_ENDIAN);
int read = f.read(tempBuf);
if (read != 4) {
throw new IOException("Expected four bytes, only read " + read);
}
Helpers.readExact(f, tempBuf);
tempBuf.flip();
return tempBuf.getInt();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,22 +15,41 @@

public class Helpers {
public static void readBytes(ReadableByteChannel f, byte[] buffer) throws IOException {
int read = f.read(ByteBuffer.wrap(buffer));
if (read != buffer.length) {
throw new IOException("Expected for bytes, only read " + read + " while it expected " + buffer.length);
}
readExact(f, ByteBuffer.wrap(buffer));
}

public static BytesInput readBytes(ReadableByteChannel f, int expected) throws IOException {
ByteBuffer buffer = ByteBuffer.allocate(expected);
int read = f.read(buffer);
if (read != expected) {
throw new IOException("Expected for bytes, only read " + read + " while it expected " + expected);
}
readExact(f, buffer);
buffer.flip();
return BytesInput.from(buffer);
}

/**
* Reads exactly {@code dst.remaining()} bytes from the blocking {@code channel} into {@code dst}. It is required
* that {@code channel} is in blocking mode, and thus will always return a non-zero
* {@link ReadableByteChannel#read(ByteBuffer)}.
*
* @param channel the readable channel
* @param dst the destination buffer
* @throws IOException if an IO error occurs
*/
public static void readExact(ReadableByteChannel channel, ByteBuffer dst) throws IOException {
final int expected = dst.remaining();
while (dst.hasRemaining()) {
final int read = channel.read(dst);
if (read == 0) {
throw new IllegalStateException(
"ReadableByteChannel.read returned 0. Either the caller has broken the contract and passed in a non-blocking channel, or the blocking channel implementation is incorrectly returning 0.");
}
if (read == -1) {
throw new EOFException(
String.format("Reached end-of-stream before completing, expected=%d, remaining=%d", expected,
dst.remaining()));
}
}
}

static int readUnsignedVarInt(ByteBuffer in) {
int value = 0;
int i = 0;
Expand Down

0 comments on commit 7f1a9a4

Please sign in to comment.