Skip to content

Commit

Permalink
Handles the case where the binary cursor's InputStream provides fewer…
Browse files Browse the repository at this point in the history
… bytes than requested before reaching EOF. (#623)
  • Loading branch information
tgregg authored Nov 3, 2023
1 parent ab84b08 commit 61d76e0
Show file tree
Hide file tree
Showing 2 changed files with 135 additions and 18 deletions.
45 changes: 27 additions & 18 deletions src/com/amazon/ion/impl/IonCursorBinary.java
Original file line number Diff line number Diff line change
Expand Up @@ -524,8 +524,7 @@ private boolean fillAt(long index, long numberOfBytes) {
refillableState.bytesRequested = numberOfBytes + (index - offset);
if (ensureCapacity(refillableState.bytesRequested)) {
// Fill all the free space, not just the shortfall; this reduces I/O.
refill(freeSpaceAt(limit));
shortfall = refillableState.bytesRequested - availableAt(offset);
shortfall = refill(refillableState.bytesRequested);
} else {
// The request cannot be satisfied, but not because data was unavailable. Return normally; it is the
// caller's responsibility to recover.
Expand Down Expand Up @@ -646,24 +645,34 @@ private void shiftIndicesLeft(int shiftAmount) {
}

/**
* Fills the buffer with up to the requested number of additional bytes. It is the caller's responsibility to
* ensure that there is space in the buffer.
* @param numberOfBytesToFill the number of additional bytes to attempt to add to the buffer.
* Attempts to fill the buffer with up to the requested number of additional bytes. It is the caller's
* responsibility to ensure that there is space in the buffer.
* @param minimumNumberOfBytesRequired the minimum number of bytes requested to fill the current value.
* @return the shortfall between the number of bytes that were filled and the minimum number requested. If less than
* 1, then at least `minimumNumberOfBytesRequired` were filled.
*/
private void refill(long numberOfBytesToFill) {
private long refill(long minimumNumberOfBytesRequired) {
int numberOfBytesFilled = -1;
try {
numberOfBytesFilled = refillableState.inputStream.read(buffer, (int) limit, (int) numberOfBytesToFill);
} catch (EOFException e) {
// Certain InputStream implementations (e.g. GZIPInputStream) throw EOFException if more bytes are requested
// to read than are currently available (e.g. if a header or trailer is incomplete).
} catch (IOException e) {
throwAsIonException(e);
}
if (numberOfBytesFilled < 0) {
return;
}
limit += numberOfBytesFilled;
long shortfall;
// Sometimes an InputStream implementation will return fewer than the number of bytes requested even
// if the stream is not at EOF. If this happens and there is still a shortfall, keep requesting bytes
// until either the shortfall is filled or EOF is reached.
do {
try {
numberOfBytesFilled = refillableState.inputStream.read(buffer, (int) limit, (int) freeSpaceAt(limit));
} catch (EOFException e) {
// Certain InputStream implementations (e.g. GZIPInputStream) throw EOFException if more bytes are requested
// to read than are currently available (e.g. if a header or trailer is incomplete).
numberOfBytesFilled = -1;
} catch (IOException e) {
throwAsIonException(e);
}
if (numberOfBytesFilled > 0) {
limit += numberOfBytesFilled;
}
shortfall = minimumNumberOfBytesRequired - availableAt(offset);
} while (shortfall > 0 && numberOfBytesFilled >= 0);
return shortfall;
}

/**
Expand Down
108 changes: 108 additions & 0 deletions test/com/amazon/ion/impl/IonReaderContinuableTopLevelBinaryTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,12 @@
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.CsvSource;
import org.junit.jupiter.params.provider.ValueSource;

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.EOFException;
import java.io.IOException;
import java.io.InputStream;
import java.math.BigDecimal;
Expand Down Expand Up @@ -3196,6 +3198,112 @@ public void skipWithoutEnoughDataNonIncrementalFails() throws Exception {
reader.close();
}

/**
* An InputStream that returns less than the number of bytes requested from bulk reads.
*/
private static class ThrottlingInputStream extends InputStream {

private final byte[] data;
private final boolean throwFromReadOnEof;
private int offset = 0;

/**
* @param data the data for the InputStream to provide.
* @param throwFromReadOnEof true if the stream should throw {@link java.io.EOFException} when read() is called
* at EOF. If false, simply returns -1.
*/
protected ThrottlingInputStream(byte[] data, boolean throwFromReadOnEof) {
this.data = data;
this.throwFromReadOnEof = throwFromReadOnEof;
}

@Override
public int read() {
return data[offset++] & 0xFF;
}

private int calculateNumberOfBytesToReturn(int numberOfBytesRequested) {
int available = data.length - offset;
int numberOfBytesToReturn;
if (available > 1 && numberOfBytesRequested > 1) {
// Return fewer bytes than requested and fewer than are available, avoiding EOF.
numberOfBytesToReturn = Math.min(available - 1, numberOfBytesRequested - 1);
} else if (available <= 0) {
return -1; // EOF
} else {
// Only 1 byte is available, so return it as long as at least 1 byte was requested.
numberOfBytesToReturn = Math.min(numberOfBytesRequested, available);
}
return numberOfBytesToReturn;
}

@Override
public int read(byte[] b, int off, int len) throws IOException {
if (off + len > b.length) {
throw new IndexOutOfBoundsException();
}
int numberOfBytesToReturn = calculateNumberOfBytesToReturn(len);
if (numberOfBytesToReturn < 0) {
if (throwFromReadOnEof) {
throw new EOFException();
}
return -1;
}
System.arraycopy(data, offset, b, off, numberOfBytesToReturn);
offset += numberOfBytesToReturn;
return numberOfBytesToReturn;
}

@Override
public long skip(long len) {
int numberOfBytesToSkip = calculateNumberOfBytesToReturn((int) len);
offset += numberOfBytesToSkip;
return numberOfBytesToSkip;
}
}

@ParameterizedTest(name = "incrementalReadingEnabled={0},throwOnEof={1}")
@CsvSource({
"true, true",
"true, false",
"false, true",
"false, false"
})
public void shouldNotFailWhenAnInputStreamProvidesFewerBytesThanRequestedWithoutReachingEof(boolean incrementalReadingEnabled, boolean throwOnEof) throws Exception {
readerBuilder = readerBuilder.withIncrementalReadingEnabled(incrementalReadingEnabled)
.withBufferConfiguration(IonBufferConfiguration.Builder.standard().withInitialBufferSize(8).build());
reader = readerFor(new ThrottlingInputStream(bytes(0xE0, 0x01, 0x00, 0xEA, 0x89, 'a', 'b', 'c', 'd', 'e', 'f', 'g', 'h', 'i'), throwOnEof));
assertSequence(
next(IonType.STRING), stringValue("abcdefghi"),
next(null)
);
reader.close();
}

@Test
public void shouldNotFailWhenAnInputStreamProvidesFewerBytesThanRequestedWithoutReachingEofAndTheReaderSkipsTheValue() throws Exception {
reader = boundedReaderFor(new ThrottlingInputStream(bytes(0xE0, 0x01, 0x00, 0xEA, 0x89, 'a', 'b', 'c', 'd', 'e', 'f', 'g', 'h', 'i', 0x20), false), 8, 8, byteAndOversizedValueCountingHandler);
assertSequence(
next(IonType.INT), intValue(0),
next(null)
);
reader.close();
assertEquals(1, oversizedCounter.get());
}

@Test
public void shouldNotFailWhenGZIPBoundaryIsEncounteredInStringValue() throws Exception {
ResizingPipedInputStream pipe = new ResizingPipedInputStream(128);
// The following lines create a GZIP payload boundary (trailer/header) in the middle of an Ion string value.
pipe.receive(gzippedBytes(0xE0, 0x01, 0x00, 0xEA, 0x89, 'a', 'b'));
pipe.receive(gzippedBytes('c', 'd', 'e', 'f', 'g', 'h', 'i'));
reader = readerFor(new GZIPInputStream(pipe));
assertSequence(
next(IonType.STRING), stringValue("abcdefghi"),
next(null)
);
}

@Test
public void concatenatedAfterGZIPHeader() throws Exception {
// Tests that a stream that initially contains only a GZIP header can be read successfully if more data
Expand Down

0 comments on commit 61d76e0

Please sign in to comment.