diff --git a/data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/S3InputFile.java b/data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/S3InputFile.java index 0f72bf1a40..8e0d1c4234 100644 --- a/data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/S3InputFile.java +++ b/data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/S3InputFile.java @@ -2,6 +2,7 @@ import org.apache.parquet.io.SeekableInputStream; import org.opensearch.dataprepper.model.io.InputFile; +import org.opensearch.dataprepper.plugins.source.ownership.BucketOwnerProvider; import software.amazon.awssdk.services.s3.S3Client; import software.amazon.awssdk.services.s3.model.HeadObjectRequest; import software.amazon.awssdk.services.s3.model.HeadObjectResponse; @@ -18,17 +19,20 @@ public class S3InputFile implements InputFile { private final S3ObjectReference s3ObjectReference; + private final BucketOwnerProvider bucketOwnerProvider; private final S3ObjectPluginMetrics s3ObjectPluginMetrics; private HeadObjectResponse metadata; public S3InputFile( - final S3Client s3Client, - final S3ObjectReference s3ObjectReference, - final S3ObjectPluginMetrics s3ObjectPluginMetrics + final S3Client s3Client, + final S3ObjectReference s3ObjectReference, + final BucketOwnerProvider bucketOwnerProvider, + final S3ObjectPluginMetrics s3ObjectPluginMetrics ) { this.s3Client = s3Client; this.s3ObjectReference = s3ObjectReference; + this.bucketOwnerProvider = bucketOwnerProvider; this.s3ObjectPluginMetrics = s3ObjectPluginMetrics; } @@ -49,7 +53,7 @@ public long getLength() { @Override public SeekableInputStream newStream() { return new S3InputStream( - s3Client, s3ObjectReference, getMetadata(), s3ObjectPluginMetrics, DEFAULT_RETRY_DELAY, DEFAULT_RETRIES); + s3Client, s3ObjectReference, bucketOwnerProvider, getMetadata(), s3ObjectPluginMetrics, DEFAULT_RETRY_DELAY, DEFAULT_RETRIES); } /** @@ -58,9 +62,12 @@ public SeekableInputStream newStream() { */ private synchronized HeadObjectResponse getMetadata() { if (metadata == null) { - final HeadObjectRequest request = HeadObjectRequest.builder() + final HeadObjectRequest.Builder headRequestBuilder = HeadObjectRequest.builder() .bucket(s3ObjectReference.getBucketName()) - .key(s3ObjectReference.getKey()) + .key(s3ObjectReference.getKey()); + bucketOwnerProvider.getBucketOwner(s3ObjectReference.getBucketName()) + .ifPresent(headRequestBuilder::expectedBucketOwner); + final HeadObjectRequest request = headRequestBuilder .build(); metadata = s3Client.headObject(request); } diff --git a/data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/S3InputStream.java b/data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/S3InputStream.java index e2c3570025..19bdf9a8af 100644 --- a/data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/S3InputStream.java +++ b/data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/S3InputStream.java @@ -8,6 +8,7 @@ import dev.failsafe.function.CheckedSupplier; import org.apache.http.ConnectionClosedException; import org.apache.parquet.io.SeekableInputStream; +import org.opensearch.dataprepper.plugins.source.ownership.BucketOwnerProvider; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import software.amazon.awssdk.core.sync.ResponseTransformer; @@ -73,12 +74,13 @@ class S3InputStream extends SeekableInputStream { private RetryPolicy retryPolicyReturningInteger; public S3InputStream( - final S3Client s3Client, - final S3ObjectReference s3ObjectReference, - final HeadObjectResponse metadata, - final S3ObjectPluginMetrics s3ObjectPluginMetrics, - final Duration retryDelay, - final int retries + final S3Client s3Client, + final S3ObjectReference s3ObjectReference, + final BucketOwnerProvider bucketOwnerProvider, + final HeadObjectResponse metadata, + final S3ObjectPluginMetrics s3ObjectPluginMetrics, + final Duration retryDelay, + final int retries ) { this.s3Client = s3Client; this.s3ObjectReference = s3ObjectReference; @@ -90,6 +92,9 @@ public S3InputStream( .bucket(this.s3ObjectReference.getBucketName()) .key(this.s3ObjectReference.getKey()); + bucketOwnerProvider.getBucketOwner(this.s3ObjectReference.getBucketName()) + .ifPresent(getObjectRequestBuilder::expectedBucketOwner); + this.retryPolicyReturningByteArray = RetryPolicy.builder() .handle(RETRYABLE_EXCEPTIONS) .withDelay(retryDelay) diff --git a/data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/S3ObjectWorker.java b/data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/S3ObjectWorker.java index 8ecfeb072d..fb0bfb412a 100644 --- a/data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/S3ObjectWorker.java +++ b/data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/S3ObjectWorker.java @@ -74,7 +74,7 @@ private void doParseObject(final AcknowledgementSet acknowledgementSet, final S3 LOG.info("Read S3 object: {}", s3ObjectReference); - final S3InputFile inputFile = new S3InputFile(s3Client, s3ObjectReference, s3ObjectPluginMetrics); + final S3InputFile inputFile = new S3InputFile(s3Client, s3ObjectReference, bucketOwnerProvider, s3ObjectPluginMetrics); final CompressionOption fileCompressionOption = compressionOption != CompressionOption.AUTOMATIC ? compressionOption : CompressionOption.fromFileName(s3ObjectReference.getKey()); diff --git a/data-prepper-plugins/s3-source/src/test/java/org/opensearch/dataprepper/plugins/source/S3InputFileTest.java b/data-prepper-plugins/s3-source/src/test/java/org/opensearch/dataprepper/plugins/source/S3InputFileTest.java index 8997ea6045..79d1928d35 100644 --- a/data-prepper-plugins/s3-source/src/test/java/org/opensearch/dataprepper/plugins/source/S3InputFileTest.java +++ b/data-prepper-plugins/s3-source/src/test/java/org/opensearch/dataprepper/plugins/source/S3InputFileTest.java @@ -3,12 +3,19 @@ import org.apache.parquet.io.SeekableInputStream; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import org.mockito.ArgumentCaptor; +import org.opensearch.dataprepper.plugins.source.ownership.BucketOwnerProvider; import software.amazon.awssdk.services.s3.S3Client; import software.amazon.awssdk.services.s3.model.HeadObjectRequest; import software.amazon.awssdk.services.s3.model.HeadObjectResponse; +import java.util.Optional; +import java.util.UUID; + import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.CoreMatchers.nullValue; import static org.hamcrest.MatcherAssert.assertThat; +import static org.junit.jupiter.api.Assertions.assertAll; import static org.mockito.Mockito.any; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.times; @@ -20,15 +27,24 @@ public class S3InputFileTest { private S3Client s3Client; private S3ObjectReference s3ObjectReference; private S3ObjectPluginMetrics s3ObjectPluginMetrics; - private S3InputFile s3InputFile; + private String bucketName; + private String key; + private BucketOwnerProvider bucketOwnerProvider; @BeforeEach public void setUp() { s3Client = mock(S3Client.class); s3ObjectReference = mock(S3ObjectReference.class); s3ObjectPluginMetrics = mock(S3ObjectPluginMetrics.class); + bucketOwnerProvider = mock(BucketOwnerProvider.class); + bucketName = UUID.randomUUID().toString(); + key = UUID.randomUUID().toString(); + when(s3ObjectReference.getBucketName()).thenReturn(bucketName); + when(s3ObjectReference.getKey()).thenReturn(key); + } - s3InputFile = new S3InputFile(s3Client, s3ObjectReference, s3ObjectPluginMetrics); + private S3InputFile createObjectUnderTest() { + return new S3InputFile(s3Client, s3ObjectReference, bucketOwnerProvider, s3ObjectPluginMetrics); } @Test @@ -37,10 +53,42 @@ public void testGetLength() { when(s3Client.headObject(any(HeadObjectRequest.class))).thenReturn(headObjectResponse); when(headObjectResponse.contentLength()).thenReturn(12345L); - long length = s3InputFile.getLength(); + long length = createObjectUnderTest().getLength(); + + assertThat(length, equalTo(12345L)); + final ArgumentCaptor headObjectRequestArgumentCaptor = ArgumentCaptor.forClass(HeadObjectRequest.class); + verify(s3Client, times(1)).headObject(headObjectRequestArgumentCaptor.capture()); + + final HeadObjectRequest actualHeadObjectRequest = headObjectRequestArgumentCaptor.getValue(); + assertAll( + () -> assertThat(actualHeadObjectRequest.bucket(), equalTo(bucketName)), + () -> assertThat(actualHeadObjectRequest.key(), equalTo(key)), + () -> assertThat(actualHeadObjectRequest.expectedBucketOwner(), nullValue()) + ); + } + + @Test + public void getLength_requests_head_for_bucket_key_and_owner_when_bucket_has_owner() { + final HeadObjectResponse headObjectResponse = mock(HeadObjectResponse.class); + when(s3Client.headObject(any(HeadObjectRequest.class))).thenReturn(headObjectResponse); + when(headObjectResponse.contentLength()).thenReturn(12345L); + + final String owner = UUID.randomUUID().toString(); + when(bucketOwnerProvider.getBucketOwner(bucketName)).thenReturn(Optional.of(owner)); + + long length = createObjectUnderTest().getLength(); assertThat(length, equalTo(12345L)); - verify(s3Client, times(1)).headObject(any(HeadObjectRequest.class)); + + final ArgumentCaptor headObjectRequestArgumentCaptor = ArgumentCaptor.forClass(HeadObjectRequest.class); + verify(s3Client, times(1)).headObject(headObjectRequestArgumentCaptor.capture()); + + final HeadObjectRequest actualHeadObjectRequest = headObjectRequestArgumentCaptor.getValue(); + assertAll( + () -> assertThat(actualHeadObjectRequest.bucket(), equalTo(bucketName)), + () -> assertThat(actualHeadObjectRequest.key(), equalTo(key)), + () -> assertThat(actualHeadObjectRequest.expectedBucketOwner(), equalTo(owner)) + ); } @Test @@ -48,7 +96,7 @@ public void testNewStream() { HeadObjectResponse headObjectResponse = mock(HeadObjectResponse.class); when(s3Client.headObject(any(HeadObjectRequest.class))).thenReturn(headObjectResponse); - SeekableInputStream seekableInputStream = s3InputFile.newStream(); + SeekableInputStream seekableInputStream = createObjectUnderTest().newStream(); assertThat(seekableInputStream.getClass(), equalTo(S3InputStream.class)); } diff --git a/data-prepper-plugins/s3-source/src/test/java/org/opensearch/dataprepper/plugins/source/S3InputStreamTest.java b/data-prepper-plugins/s3-source/src/test/java/org/opensearch/dataprepper/plugins/source/S3InputStreamTest.java index 12dd55988a..9eb65ee953 100644 --- a/data-prepper-plugins/s3-source/src/test/java/org/opensearch/dataprepper/plugins/source/S3InputStreamTest.java +++ b/data-prepper-plugins/s3-source/src/test/java/org/opensearch/dataprepper/plugins/source/S3InputStreamTest.java @@ -7,8 +7,10 @@ import org.junit.jupiter.api.extension.ExtendWith; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.MethodSource; +import org.mockito.ArgumentCaptor; import org.mockito.Mock; import org.mockito.junit.jupiter.MockitoExtension; +import org.opensearch.dataprepper.plugins.source.ownership.BucketOwnerProvider; import software.amazon.awssdk.core.sync.ResponseTransformer; import software.amazon.awssdk.http.HttpStatusCode; import software.amazon.awssdk.services.s3.S3Client; @@ -21,8 +23,14 @@ import java.io.InputStream; import java.nio.ByteBuffer; import java.time.Duration; +import java.util.Optional; +import java.util.UUID; import java.util.stream.Stream; +import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.CoreMatchers.nullValue; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.junit.jupiter.api.Assertions.assertAll; import static org.junit.jupiter.api.Assertions.assertArrayEquals; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertThrows; @@ -43,6 +51,8 @@ class S3InputStreamTest { private S3Client s3Client; @Mock(lenient = true) private S3ObjectReference s3ObjectReference; + @Mock + private BucketOwnerProvider bucketOwnerProvider; @Mock(lenient = true) private HeadObjectResponse metadata; @Mock(lenient = true) @@ -50,8 +60,8 @@ class S3InputStreamTest { private DistributionSummary s3ObjectSizeProcessedSummary; private Counter s3ObjectsFailedNotFoundCounter; private Counter s3ObjectsFailedAccessDeniedCounter; - - private S3InputStream s3InputStream; + private String bucketName; + private String key; @BeforeEach void setUp() { @@ -59,22 +69,27 @@ void setUp() { s3ObjectsFailedNotFoundCounter = mock(Counter.class); s3ObjectsFailedAccessDeniedCounter = mock(Counter.class); - when(s3ObjectReference.getBucketName()).thenReturn("test-bucket"); - when(s3ObjectReference.getKey()).thenReturn("test-key"); + bucketName = UUID.randomUUID().toString(); + key = UUID.randomUUID().toString(); + when(s3ObjectReference.getBucketName()).thenReturn(bucketName); + when(s3ObjectReference.getKey()).thenReturn(key); when(metadata.contentLength()).thenReturn(1000L); when(s3ObjectPluginMetrics.getS3ObjectSizeProcessedSummary()).thenReturn(s3ObjectSizeProcessedSummary); when(s3ObjectPluginMetrics.getS3ObjectsFailedNotFoundCounter()).thenReturn(s3ObjectsFailedNotFoundCounter); when(s3ObjectPluginMetrics.getS3ObjectsFailedAccessDeniedCounter()).thenReturn(s3ObjectsFailedAccessDeniedCounter); + } - s3InputStream = new S3InputStream( - s3Client, s3ObjectReference, metadata, s3ObjectPluginMetrics, RETRY_DELAY, RETRIES); + private S3InputStream createObjectUnderTest() { + return new S3InputStream( + s3Client, s3ObjectReference, bucketOwnerProvider, metadata, s3ObjectPluginMetrics, RETRY_DELAY, RETRIES); } @Test void testAvailable() throws IOException { InputStream inputStream = new ByteArrayInputStream("Test data".getBytes()); when(s3Client.getObject(any(GetObjectRequest.class), any(ResponseTransformer.class))).thenReturn(inputStream); - s3InputStream.seek(0); + final S3InputStream s3InputStream = createObjectUnderTest(); + createObjectUnderTest().seek(0); int availableBytes = s3InputStream.available(); assertEquals(9, availableBytes); @@ -82,6 +97,7 @@ void testAvailable() throws IOException { @Test void testClose() throws IOException { + final S3InputStream s3InputStream = createObjectUnderTest(); s3InputStream.close(); assertThrows(IllegalStateException .class, () -> s3InputStream.read()); @@ -91,6 +107,7 @@ void testClose() throws IOException { void testMarkAndReset() throws IOException { InputStream inputStream = new ByteArrayInputStream("Test data".getBytes()); when(s3Client.getObject(any(GetObjectRequest.class), any(ResponseTransformer.class))).thenReturn(inputStream); + final S3InputStream s3InputStream = createObjectUnderTest(); s3InputStream.seek(5); s3InputStream.mark(5); @@ -104,6 +121,7 @@ void testMarkAndReset() throws IOException { void testRead() throws IOException { InputStream inputStream = new ByteArrayInputStream("Test data".getBytes()); when(s3Client.getObject(any(GetObjectRequest.class), any(ResponseTransformer.class))).thenReturn(inputStream); + final S3InputStream s3InputStream = createObjectUnderTest(); s3InputStream.seek(0); // Force opening the stream int firstByte = s3InputStream.read(); @@ -114,11 +132,56 @@ void testRead() throws IOException { verify(s3ObjectSizeProcessedSummary).record(1.0); } + @Test + void read_will_read_from_S3_with_bucket_and_key_when_no_owner() throws IOException { + final InputStream inputStream = new ByteArrayInputStream("Test data".getBytes()); + when(s3Client.getObject(any(GetObjectRequest.class), any(ResponseTransformer.class))).thenReturn(inputStream); + final S3InputStream s3InputStream = createObjectUnderTest(); + s3InputStream.seek(0); // Force opening the stream + + s3InputStream.read(); + s3InputStream.close(); + + final ArgumentCaptor getObjectRequestArgumentCaptor = ArgumentCaptor.forClass(GetObjectRequest.class); + verify(s3Client).getObject(getObjectRequestArgumentCaptor.capture(), any(ResponseTransformer.class)); + + final GetObjectRequest getObjectRequest = getObjectRequestArgumentCaptor.getValue(); + assertAll( + () -> assertThat(getObjectRequest.bucket(), equalTo(bucketName)), + () -> assertThat(getObjectRequest.key(), equalTo(key)), + () -> assertThat(getObjectRequest.expectedBucketOwner(), nullValue()) + ); + } + + @Test + void read_will_read_from_S3_with_bucket_key_and_expected_owner_when_owner_defined() throws IOException { + final String owner = UUID.randomUUID().toString(); + when(bucketOwnerProvider.getBucketOwner(bucketName)).thenReturn(Optional.of(owner)); + final InputStream inputStream = new ByteArrayInputStream("Test data".getBytes()); + when(s3Client.getObject(any(GetObjectRequest.class), any(ResponseTransformer.class))).thenReturn(inputStream); + final S3InputStream s3InputStream = createObjectUnderTest(); + s3InputStream.seek(0); // Force opening the stream + + s3InputStream.read(); + s3InputStream.close(); + + final ArgumentCaptor getObjectRequestArgumentCaptor = ArgumentCaptor.forClass(GetObjectRequest.class); + verify(s3Client).getObject(getObjectRequestArgumentCaptor.capture(), any(ResponseTransformer.class)); + + final GetObjectRequest getObjectRequest = getObjectRequestArgumentCaptor.getValue(); + assertAll( + () -> assertThat(getObjectRequest.bucket(), equalTo(bucketName)), + () -> assertThat(getObjectRequest.key(), equalTo(key)), + () -> assertThat(getObjectRequest.expectedBucketOwner(), equalTo(owner)) + ); + } + @Test void testReadEndOfFile() throws IOException { InputStream inputStream = new ByteArrayInputStream("".getBytes()); when(s3Client.getObject(any(GetObjectRequest.class), any(ResponseTransformer.class))).thenReturn(inputStream); + final S3InputStream s3InputStream = createObjectUnderTest(); int firstByte = s3InputStream.read(); assertEquals(-1, firstByte); @@ -138,6 +201,7 @@ void testReadSucceedsAfterRetries(final Class retryableExce .thenThrow(retryableExceptionClass) .thenReturn(1); + final S3InputStream s3InputStream = createObjectUnderTest(); int firstByte = s3InputStream.read(); assertEquals(1, firstByte); @@ -160,6 +224,7 @@ void testReadFailsAfterRetries(final Class retryableExcepti when(mockInputStream.read()).thenThrow(retryableExceptionClass); + final S3InputStream s3InputStream = createObjectUnderTest(); assertThrows(IOException.class, () -> s3InputStream.read()); s3InputStream.close(); @@ -174,6 +239,7 @@ void testReadFailsAfterRetries(final Class retryableExcepti void testReadByteArray() throws IOException { InputStream inputStream = new ByteArrayInputStream("Test data".getBytes()); when(s3Client.getObject(any(GetObjectRequest.class), any(ResponseTransformer.class))).thenReturn(inputStream); + final S3InputStream s3InputStream = createObjectUnderTest(); s3InputStream.seek(0); // Force opening the stream byte[] buffer = new byte[4]; @@ -191,6 +257,7 @@ void testReadByteArray() throws IOException { void testReadAllBytes() throws IOException { InputStream inputStream = new ByteArrayInputStream("Test data".getBytes()); when(s3Client.getObject(any(GetObjectRequest.class), any(ResponseTransformer.class))).thenReturn(inputStream); + final S3InputStream s3InputStream = createObjectUnderTest(); s3InputStream.seek(0); // Force opening the stream final byte[] buffer = s3InputStream.readAllBytes(); @@ -214,6 +281,7 @@ void testReadAllBytesSucceedsAfterRetries(final Class retry .thenThrow(retryableExceptionClass) .thenReturn("Test data".getBytes()); + final S3InputStream s3InputStream = createObjectUnderTest(); final byte[] buffer = s3InputStream.readAllBytes(); assertArrayEquals("Test data".getBytes(), buffer); @@ -238,6 +306,7 @@ void testReadAllBytesFailsAfterRetries(final Class retryabl when(mockInputStream.readAllBytes()).thenThrow(retryableExceptionClass); + final S3InputStream s3InputStream = createObjectUnderTest(); assertThrows(IOException.class, () -> s3InputStream.readAllBytes()); s3InputStream.close(); @@ -252,6 +321,7 @@ void testReadAllBytesFailsAfterRetries(final Class retryabl void testReadNBytes_intoArray() throws Exception { InputStream inputStream = new ByteArrayInputStream("Test data".getBytes()); when(s3Client.getObject(any(GetObjectRequest.class), any(ResponseTransformer.class))).thenReturn(inputStream); + final S3InputStream s3InputStream = createObjectUnderTest(); s3InputStream.seek(0); // Force opening the stream byte[] buffer = new byte[9]; @@ -270,6 +340,7 @@ void testReadNBytes_endOfFile() throws Exception { when(s3Client.getObject(any(GetObjectRequest.class), any(ResponseTransformer.class))).thenReturn(inputStream); byte[] buffer = new byte[9]; + final S3InputStream s3InputStream = createObjectUnderTest(); int bytesRead = s3InputStream.readNBytes(buffer, 0, 4); assertEquals(0, bytesRead); @@ -282,6 +353,7 @@ void testReadNBytes_endOfFile() throws Exception { void testReadNBytes_getArray() throws Exception { InputStream inputStream = new ByteArrayInputStream("Test data".getBytes()); when(s3Client.getObject(any(GetObjectRequest.class), any(ResponseTransformer.class))).thenReturn(inputStream); + final S3InputStream s3InputStream = createObjectUnderTest(); s3InputStream.seek(0); // Force opening the stream final byte[] buffer = s3InputStream.readNBytes(4); @@ -296,6 +368,7 @@ void testReadNBytes_getArray() throws Exception { void testSkip() throws IOException { InputStream inputStream = new ByteArrayInputStream("Test data".getBytes()); when(s3Client.getObject(any(GetObjectRequest.class), any(ResponseTransformer.class))).thenReturn(inputStream); + final S3InputStream s3InputStream = createObjectUnderTest(); s3InputStream.seek(0); // Force opening the stream long skippedBytes = s3InputStream.skip(5); @@ -311,6 +384,7 @@ void testSkip() throws IOException { void testSeek() throws IOException { InputStream inputStream = new ByteArrayInputStream("Test data".getBytes()); when(s3Client.getObject(any(GetObjectRequest.class), any(ResponseTransformer.class))).thenReturn(inputStream); + final S3InputStream s3InputStream = createObjectUnderTest(); s3InputStream.read(); s3InputStream.seek(5); @@ -328,6 +402,7 @@ void testSeek() throws IOException { void testReadFullyByteArray() throws IOException { InputStream inputStream = new ByteArrayInputStream("Test data".getBytes()); when(s3Client.getObject(any(GetObjectRequest.class), any(ResponseTransformer.class))).thenReturn(inputStream); + final S3InputStream s3InputStream = createObjectUnderTest(); s3InputStream.seek(0); // Force opening the stream byte[] buffer = new byte[4]; @@ -343,6 +418,7 @@ void testReadFullyByteArray() throws IOException { void testReadFullyByteArrayWithStartAndLength() throws IOException { InputStream inputStream = new ByteArrayInputStream("Test data".getBytes()); when(s3Client.getObject(any(GetObjectRequest.class), any(ResponseTransformer.class))).thenReturn(inputStream); + final S3InputStream s3InputStream = createObjectUnderTest(); s3InputStream.seek(0); // Force opening the stream byte[] buffer = new byte[6]; @@ -358,6 +434,7 @@ void testReadFullyByteArrayWithStartAndLength() throws IOException { void testReadFullyByteBuffer() throws IOException { InputStream inputStream = new ByteArrayInputStream("Test data".getBytes()); when(s3Client.getObject(any(GetObjectRequest.class), any(ResponseTransformer.class))).thenReturn(inputStream); + final S3InputStream s3InputStream = createObjectUnderTest(); s3InputStream.seek(0); // Force opening the stream ByteBuffer buffer = ByteBuffer.allocate(4); @@ -374,6 +451,7 @@ void testReadFullyByteBuffer() throws IOException { void testReadFullyByteBuffer_endOfFile() throws IOException { InputStream inputStream = new ByteArrayInputStream("".getBytes()); when(s3Client.getObject(any(GetObjectRequest.class), any(ResponseTransformer.class))).thenReturn(inputStream); + final S3InputStream s3InputStream = createObjectUnderTest(); s3InputStream.seek(0); // Force opening the stream ByteBuffer buffer = ByteBuffer.allocate(4); @@ -387,6 +465,7 @@ void testReadFullyByteBuffer_endOfFile() throws IOException { void testReadFullyHeapBuffer() throws IOException { InputStream inputStream = new ByteArrayInputStream("Test data".getBytes()); when(s3Client.getObject(any(GetObjectRequest.class), any(ResponseTransformer.class))).thenReturn(inputStream); + final S3InputStream s3InputStream = createObjectUnderTest(); s3InputStream.seek(0); // Force opening the stream ByteBuffer buffer = ByteBuffer.allocate(4); @@ -403,6 +482,7 @@ void testReadFullyHeapBuffer() throws IOException { void testReadFullyDirectBuffer() throws IOException { InputStream inputStream = new ByteArrayInputStream("Test data".getBytes()); when(s3Client.getObject(any(GetObjectRequest.class), any(ResponseTransformer.class))).thenReturn(inputStream); + final S3InputStream s3InputStream = createObjectUnderTest(); s3InputStream.seek(0); // Force opening the stream ByteBuffer buffer = ByteBuffer.allocateDirect(4); @@ -421,6 +501,7 @@ void testReadFullyDirectBuffer() throws IOException { void testReadFromClosedFails() throws IOException { InputStream inputStream = new ByteArrayInputStream("Test data".getBytes()); when(s3Client.getObject(any(GetObjectRequest.class), any(ResponseTransformer.class))).thenReturn(inputStream); + final S3InputStream s3InputStream = createObjectUnderTest(); s3InputStream.seek(0); // Force opening the stream byte[] buffer = new byte[4]; @@ -436,6 +517,7 @@ void testReadFromClosedFails() throws IOException { void testReadAfterSeekBackwardsWorks() throws IOException { InputStream inputStream = new ByteArrayInputStream("Test data".getBytes()); when(s3Client.getObject(any(GetObjectRequest.class), any(ResponseTransformer.class))).thenReturn(inputStream); + final S3InputStream s3InputStream = createObjectUnderTest(); s3InputStream.seek(5); // Force opening the stream byte[] buffer = new byte[4]; @@ -460,6 +542,7 @@ void testS3ObjectsFailedNotFoundCounter() { .statusCode(HttpStatusCode.NOT_FOUND) .build()); + final S3InputStream s3InputStream = createObjectUnderTest(); assertThrows(IOException.class, () -> s3InputStream.read()); // Force opening the stream verify(s3ObjectsFailedNotFoundCounter).increment(); @@ -472,6 +555,7 @@ void testS3ObjectsFailedAccessDeniedCounter() { .statusCode(HttpStatusCode.FORBIDDEN) .build()); + final S3InputStream s3InputStream = createObjectUnderTest(); assertThrows(IOException.class, () -> s3InputStream.read()); // Force opening the stream verify(s3ObjectsFailedAccessDeniedCounter).increment();