diff --git a/data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/S3InputStream.java b/data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/S3InputStream.java index 0629db6831..14f8a509ff 100644 --- a/data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/S3InputStream.java +++ b/data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/S3InputStream.java @@ -129,11 +129,15 @@ public int read() throws IOException { Preconditions.checkState(!closed, "Cannot read: already closed"); positionStream(); - pos += 1; - next += 1; - bytesCounter.increment(); + final int byteRead = stream.read(); - return stream.read(); + if (byteRead != -1) { + pos += 1; + next += 1; + bytesCounter.increment(); + } + + return byteRead; } /** @@ -161,10 +165,13 @@ public int read(byte[] b, int off, int len) throws IOException { Preconditions.checkState(!closed, "Cannot read: already closed"); positionStream(); - int bytesRead = stream.read(b, off, len); - pos += bytesRead; - next += bytesRead; - bytesCounter.add(bytesRead); + final int bytesRead = stream.read(b, off, len); + + if (bytesRead > 0) { + pos += bytesRead; + next += bytesRead; + bytesCounter.add(bytesRead); + } return bytesRead; } @@ -203,9 +210,11 @@ public int readNBytes(byte[] b, int off, int len) throws IOException { final int bytesRead = stream.readNBytes(b, off, len); - pos += bytesRead; - next += bytesRead; - bytesCounter.add(bytesRead); + if (bytesRead > 0) { + pos += bytesRead; + next += bytesRead; + bytesCounter.add(bytesRead); + } return bytesRead; } @@ -325,9 +334,11 @@ public void readFully(byte[] bytes, int start, int len) throws IOException { int bytesRead = readFully(stream, bytes, start, len); - this.pos += bytesRead; - this.next += bytesRead; - this.bytesCounter.add(bytesRead); + if (bytesRead > 0) { + this.pos += bytesRead; + this.next += bytesRead; + this.bytesCounter.add(bytesRead); + } } /** @@ -354,9 +365,11 @@ public int read(ByteBuffer buf) throws IOException { bytesRead = readDirectBuffer(stream, buf, temp); } - this.pos += bytesRead; - this.next += bytesRead; - this.bytesCounter.add(bytesRead); + if (bytesRead > 0) { + this.pos += bytesRead; + this.next += bytesRead; + this.bytesCounter.add(bytesRead); + } return bytesRead; } @@ -385,9 +398,11 @@ public void readFully(ByteBuffer buf) throws IOException { bytesRead = readFullyDirectBuffer(stream, buf, temp); } - this.pos += bytesRead; - this.next += bytesRead; - this.bytesCounter.add(bytesRead); + if (bytesRead > 0) { + this.pos += bytesRead; + this.next += bytesRead; + this.bytesCounter.add(bytesRead); + } } /** @@ -478,7 +493,7 @@ private void closeStream() throws IOException { */ private void abortStream() { try { - if (stream instanceof Abortable && stream.read() != -1) { + if (stream instanceof Abortable) { ((Abortable) stream).abort(); } } catch (Exception e) { diff --git a/data-prepper-plugins/s3-source/src/test/java/org/opensearch/dataprepper/plugins/source/S3InputStreamTest.java b/data-prepper-plugins/s3-source/src/test/java/org/opensearch/dataprepper/plugins/source/S3InputStreamTest.java index 1ac31890ee..d75254189d 100644 --- a/data-prepper-plugins/s3-source/src/test/java/org/opensearch/dataprepper/plugins/source/S3InputStreamTest.java +++ b/data-prepper-plugins/s3-source/src/test/java/org/opensearch/dataprepper/plugins/source/S3InputStreamTest.java @@ -15,6 +15,7 @@ import software.amazon.awssdk.services.s3.model.S3Exception; import java.io.ByteArrayInputStream; +import java.io.EOFException; import java.io.IOException; import java.io.InputStream; import java.nio.ByteBuffer; @@ -103,6 +104,19 @@ void testRead() throws IOException { verify(s3ObjectSizeProcessedSummary).record(1.0); } + @Test + void testReadEndOfFile() throws IOException { + InputStream inputStream = new ByteArrayInputStream("".getBytes()); + when(s3Client.getObject(any(GetObjectRequest.class), any(ResponseTransformer.class))).thenReturn(inputStream); + + int firstByte = s3InputStream.read(); + assertEquals(-1, firstByte); + + s3InputStream.close(); + + verify(s3ObjectSizeProcessedSummary).record(0.0); + } + @Test void testReadByteArray() throws IOException { InputStream inputStream = new ByteArrayInputStream("Test data".getBytes()); @@ -150,6 +164,20 @@ void testReadNBytes_intoArray() throws Exception { verify(s3ObjectSizeProcessedSummary).record(4.0); } + @Test + void testReadNBytes_endOfFile() throws Exception { + InputStream inputStream = new ByteArrayInputStream("".getBytes()); + when(s3Client.getObject(any(GetObjectRequest.class), any(ResponseTransformer.class))).thenReturn(inputStream); + + byte[] buffer = new byte[9]; + int bytesRead = s3InputStream.readNBytes(buffer, 0, 4); + + assertEquals(0, bytesRead); + + s3InputStream.close(); + verify(s3ObjectSizeProcessedSummary).record(0.0); + } + @Test void testReadNBytes_getArray() throws Exception { InputStream inputStream = new ByteArrayInputStream("Test data".getBytes()); @@ -242,6 +270,19 @@ void testReadFullyByteBuffer() throws IOException { verify(s3ObjectSizeProcessedSummary).record(4.0); } + @Test + void testReadFullyByteBuffer_endOfFile() throws IOException { + InputStream inputStream = new ByteArrayInputStream("".getBytes()); + when(s3Client.getObject(any(GetObjectRequest.class), any(ResponseTransformer.class))).thenReturn(inputStream); + s3InputStream.seek(0); // Force opening the stream + + ByteBuffer buffer = ByteBuffer.allocate(4); + assertThrows(EOFException.class, () -> s3InputStream.readFully(buffer)); + + s3InputStream.close(); + verify(s3ObjectSizeProcessedSummary).record(0.0); + } + @Test void testReadFullyHeapBuffer() throws IOException { InputStream inputStream = new ByteArrayInputStream("Test data".getBytes());