Skip to content

Commit

Permalink
Fix S3 errors around end of file behavior. (opensearch-project#2983)
Browse files Browse the repository at this point in the history
Signed-off-by: Adi Suresh <[email protected]>
  • Loading branch information
asuresh8 committed Jul 5, 2023
1 parent 92af936 commit 75fa735
Show file tree
Hide file tree
Showing 2 changed files with 77 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -129,11 +129,15 @@ public int read() throws IOException {
Preconditions.checkState(!closed, "Cannot read: already closed");
positionStream();

pos += 1;
next += 1;
bytesCounter.increment();
final int byteRead = stream.read();

return stream.read();
if (byteRead != -1) {
pos += 1;
next += 1;
bytesCounter.increment();
}

return byteRead;
}

/**
Expand Down Expand Up @@ -161,10 +165,13 @@ public int read(byte[] b, int off, int len) throws IOException {
Preconditions.checkState(!closed, "Cannot read: already closed");
positionStream();

int bytesRead = stream.read(b, off, len);
pos += bytesRead;
next += bytesRead;
bytesCounter.add(bytesRead);
final int bytesRead = stream.read(b, off, len);

if (bytesRead > 0) {
pos += bytesRead;
next += bytesRead;
bytesCounter.add(bytesRead);
}

return bytesRead;
}
Expand Down Expand Up @@ -203,9 +210,11 @@ public int readNBytes(byte[] b, int off, int len) throws IOException {

final int bytesRead = stream.readNBytes(b, off, len);

pos += bytesRead;
next += bytesRead;
bytesCounter.add(bytesRead);
if (bytesRead > 0) {
pos += bytesRead;
next += bytesRead;
bytesCounter.add(bytesRead);
}

return bytesRead;
}
Expand Down Expand Up @@ -325,9 +334,11 @@ public void readFully(byte[] bytes, int start, int len) throws IOException {

int bytesRead = readFully(stream, bytes, start, len);

this.pos += bytesRead;
this.next += bytesRead;
this.bytesCounter.add(bytesRead);
if (bytesRead > 0) {
this.pos += bytesRead;
this.next += bytesRead;
this.bytesCounter.add(bytesRead);
}
}

/**
Expand All @@ -354,9 +365,11 @@ public int read(ByteBuffer buf) throws IOException {
bytesRead = readDirectBuffer(stream, buf, temp);
}

this.pos += bytesRead;
this.next += bytesRead;
this.bytesCounter.add(bytesRead);
if (bytesRead > 0) {
this.pos += bytesRead;
this.next += bytesRead;
this.bytesCounter.add(bytesRead);
}

return bytesRead;
}
Expand Down Expand Up @@ -385,9 +398,11 @@ public void readFully(ByteBuffer buf) throws IOException {
bytesRead = readFullyDirectBuffer(stream, buf, temp);
}

this.pos += bytesRead;
this.next += bytesRead;
this.bytesCounter.add(bytesRead);
if (bytesRead > 0) {
this.pos += bytesRead;
this.next += bytesRead;
this.bytesCounter.add(bytesRead);
}
}

/**
Expand Down Expand Up @@ -478,7 +493,7 @@ private void closeStream() throws IOException {
*/
private void abortStream() {
try {
if (stream instanceof Abortable && stream.read() != -1) {
if (stream instanceof Abortable) {
((Abortable) stream).abort();
}
} catch (Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import software.amazon.awssdk.services.s3.model.S3Exception;

import java.io.ByteArrayInputStream;
import java.io.EOFException;
import java.io.IOException;
import java.io.InputStream;
import java.nio.ByteBuffer;
Expand Down Expand Up @@ -103,6 +104,19 @@ void testRead() throws IOException {
verify(s3ObjectSizeProcessedSummary).record(1.0);
}

@Test
void testReadEndOfFile() throws IOException {
InputStream inputStream = new ByteArrayInputStream("".getBytes());
when(s3Client.getObject(any(GetObjectRequest.class), any(ResponseTransformer.class))).thenReturn(inputStream);

int firstByte = s3InputStream.read();
assertEquals(-1, firstByte);

s3InputStream.close();

verify(s3ObjectSizeProcessedSummary).record(0.0);
}

@Test
void testReadByteArray() throws IOException {
InputStream inputStream = new ByteArrayInputStream("Test data".getBytes());
Expand Down Expand Up @@ -150,6 +164,20 @@ void testReadNBytes_intoArray() throws Exception {
verify(s3ObjectSizeProcessedSummary).record(4.0);
}

@Test
void testReadNBytes_endOfFile() throws Exception {
InputStream inputStream = new ByteArrayInputStream("".getBytes());
when(s3Client.getObject(any(GetObjectRequest.class), any(ResponseTransformer.class))).thenReturn(inputStream);

byte[] buffer = new byte[9];
int bytesRead = s3InputStream.readNBytes(buffer, 0, 4);

assertEquals(0, bytesRead);

s3InputStream.close();
verify(s3ObjectSizeProcessedSummary).record(0.0);
}

@Test
void testReadNBytes_getArray() throws Exception {
InputStream inputStream = new ByteArrayInputStream("Test data".getBytes());
Expand Down Expand Up @@ -242,6 +270,19 @@ void testReadFullyByteBuffer() throws IOException {
verify(s3ObjectSizeProcessedSummary).record(4.0);
}

@Test
void testReadFullyByteBuffer_endOfFile() throws IOException {
InputStream inputStream = new ByteArrayInputStream("".getBytes());
when(s3Client.getObject(any(GetObjectRequest.class), any(ResponseTransformer.class))).thenReturn(inputStream);
s3InputStream.seek(0); // Force opening the stream

ByteBuffer buffer = ByteBuffer.allocate(4);
assertThrows(EOFException.class, () -> s3InputStream.readFully(buffer));

s3InputStream.close();
verify(s3ObjectSizeProcessedSummary).record(0.0);
}

@Test
void testReadFullyHeapBuffer() throws IOException {
InputStream inputStream = new ByteArrayInputStream("Test data".getBytes());
Expand Down

0 comments on commit 75fa735

Please sign in to comment.