From 9f78542533dd24ed21e29a12950938c0c4b23636 Mon Sep 17 00:00:00 2001 From: Adi Suresh Date: Tue, 11 Jul 2023 12:25:20 -0500 Subject: [PATCH] Retry s3 reads on socket exceptions. (#2992) * Retry s3 reads on socket exceptions. S3 will reset the conenction on their end frequently. To not lose data, data prepper should retry all socket exceptions by attempting to re-open the stream. Signed-off-by: Adi Suresh * Bubble up parquet exceptions. Signed-off-by: Adi Suresh --------- Signed-off-by: Adi Suresh --- .../codec/parquet/ParquetInputCodec.java | 5 +- data-prepper-plugins/s3-source/build.gradle | 2 + .../plugins/source/S3InputFile.java | 10 +- .../plugins/source/S3InputStream.java | 111 ++++++++++++++---- .../plugins/source/S3InputStreamTest.java | 109 ++++++++++++++++- 5 files changed, 205 insertions(+), 32 deletions(-) diff --git a/data-prepper-plugins/parquet-codecs/src/main/java/org/opensearch/dataprepper/plugins/codec/parquet/ParquetInputCodec.java b/data-prepper-plugins/parquet-codecs/src/main/java/org/opensearch/dataprepper/plugins/codec/parquet/ParquetInputCodec.java index c2f5f8dad7..610df799ee 100644 --- a/data-prepper-plugins/parquet-codecs/src/main/java/org/opensearch/dataprepper/plugins/codec/parquet/ParquetInputCodec.java +++ b/data-prepper-plugins/parquet-codecs/src/main/java/org/opensearch/dataprepper/plugins/codec/parquet/ParquetInputCodec.java @@ -91,8 +91,9 @@ private void parseParquetFile(final InputFile inputFile, final Consumer(event)); } - } catch (Exception parquetException){ - LOG.error("An exception occurred while parsing parquet InputStream ", parquetException); + } catch (Exception e){ + LOG.error("An exception occurred while parsing parquet InputStream ", e); + throw new IOException(e); } } diff --git a/data-prepper-plugins/s3-source/build.gradle b/data-prepper-plugins/s3-source/build.gradle index d2fb01a822..e031309da6 100644 --- a/data-prepper-plugins/s3-source/build.gradle +++ b/data-prepper-plugins/s3-source/build.gradle @@ -32,6 +32,8 @@ dependencies { implementation 'com.fasterxml.jackson.datatype:jackson-datatype-jsr310:2.15.2' implementation 'org.xerial.snappy:snappy-java:1.1.10.1' implementation 'org.apache.parquet:parquet-common:1.12.3' + implementation 'dev.failsafe:failsafe:3.3.2' + implementation 'org.apache.httpcomponents:httpcore:4.4.15' testImplementation 'org.apache.commons:commons-lang3:3.12.0' testImplementation 'com.github.tomakehurst:wiremock:3.0.0-beta-8' testImplementation 'com.fasterxml.jackson.dataformat:jackson-dataformat-yaml' diff --git a/data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/S3InputFile.java b/data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/S3InputFile.java index 3d14a08dc9..0f72bf1a40 100644 --- a/data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/S3InputFile.java +++ b/data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/S3InputFile.java @@ -6,8 +6,14 @@ import software.amazon.awssdk.services.s3.model.HeadObjectRequest; import software.amazon.awssdk.services.s3.model.HeadObjectResponse; +import java.time.Duration; + public class S3InputFile implements InputFile { + private static final Duration DEFAULT_RETRY_DELAY = Duration.ofSeconds(6); + + private static final int DEFAULT_RETRIES = 10; + private final S3Client s3Client; private final S3ObjectReference s3ObjectReference; @@ -42,8 +48,8 @@ public long getLength() { */ @Override public SeekableInputStream newStream() { - - return new S3InputStream(s3Client, s3ObjectReference, getMetadata(), s3ObjectPluginMetrics); + return new S3InputStream( + s3Client, s3ObjectReference, getMetadata(), s3ObjectPluginMetrics, DEFAULT_RETRY_DELAY, DEFAULT_RETRIES); } /** diff --git a/data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/S3InputStream.java b/data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/S3InputStream.java index 14f8a509ff..e2c3570025 100644 --- a/data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/S3InputStream.java +++ b/data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/S3InputStream.java @@ -1,13 +1,12 @@ package org.opensearch.dataprepper.plugins.source; -import java.io.EOFException; -import java.io.IOException; -import java.io.InputStream; -import java.nio.ByteBuffer; -import java.util.concurrent.atomic.LongAdder; - import com.google.common.base.Preconditions; import com.google.common.io.ByteStreams; +import dev.failsafe.Failsafe; +import dev.failsafe.FailsafeException; +import dev.failsafe.RetryPolicy; +import dev.failsafe.function.CheckedSupplier; +import org.apache.http.ConnectionClosedException; import org.apache.parquet.io.SeekableInputStream; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -19,8 +18,25 @@ import software.amazon.awssdk.services.s3.model.HeadObjectResponse; import software.amazon.awssdk.services.s3.model.S3Exception; +import java.io.EOFException; +import java.io.IOException; +import java.io.InputStream; +import java.net.SocketException; +import java.net.SocketTimeoutException; +import java.nio.ByteBuffer; +import java.time.Duration; +import java.util.List; +import java.util.concurrent.atomic.LongAdder; + class S3InputStream extends SeekableInputStream { + static final List> RETRYABLE_EXCEPTIONS = List.of( + ConnectionClosedException.class, + EOFException.class, + SocketException.class, + SocketTimeoutException.class + ); + private static final int COPY_BUFFER_SIZE = 8192; private static final Logger LOG = LoggerFactory.getLogger(S3InputStream.class); @@ -52,11 +68,17 @@ class S3InputStream extends SeekableInputStream { private boolean closed = false; + private RetryPolicy retryPolicyReturningByteArray; + + private RetryPolicy retryPolicyReturningInteger; + public S3InputStream( - final S3Client s3Client, - final S3ObjectReference s3ObjectReference, - final HeadObjectResponse metadata, - final S3ObjectPluginMetrics s3ObjectPluginMetrics + final S3Client s3Client, + final S3ObjectReference s3ObjectReference, + final HeadObjectResponse metadata, + final S3ObjectPluginMetrics s3ObjectPluginMetrics, + final Duration retryDelay, + final int retries ) { this.s3Client = s3Client; this.s3ObjectReference = s3ObjectReference; @@ -65,10 +87,23 @@ public S3InputStream( this.bytesCounter = new LongAdder(); this.getObjectRequestBuilder = GetObjectRequest.builder() - .bucket(this.s3ObjectReference.getBucketName()) - .key(this.s3ObjectReference.getKey()); + .bucket(this.s3ObjectReference.getBucketName()) + .key(this.s3ObjectReference.getKey()); + + this.retryPolicyReturningByteArray = RetryPolicy.builder() + .handle(RETRYABLE_EXCEPTIONS) + .withDelay(retryDelay) + .withMaxRetries(retries) + .build(); + + this.retryPolicyReturningInteger = RetryPolicy.builder() + .handle(RETRYABLE_EXCEPTIONS) + .withDelay(retryDelay) + .withMaxRetries(retries) + .build(); } + // Implement all InputStream methods first: /** @@ -129,7 +164,7 @@ public int read() throws IOException { Preconditions.checkState(!closed, "Cannot read: already closed"); positionStream(); - final int byteRead = stream.read(); + final int byteRead = executeWithRetriesAndReturnInt(() -> stream.read()); if (byteRead != -1) { pos += 1; @@ -165,7 +200,7 @@ public int read(byte[] b, int off, int len) throws IOException { Preconditions.checkState(!closed, "Cannot read: already closed"); positionStream(); - final int bytesRead = stream.read(b, off, len); + final int bytesRead = executeWithRetriesAndReturnInt(() -> stream.read(b, off, len)); if (bytesRead > 0) { pos += bytesRead; @@ -186,7 +221,7 @@ public byte[] readAllBytes() throws IOException { Preconditions.checkState(!closed, "Cannot read: already closed"); positionStream(); - final byte[] bytesRead = stream.readAllBytes(); + final byte[] bytesRead = executeWithRetriesAndReturnByteArray(() -> stream.readAllBytes()); pos += bytesRead.length; next += bytesRead.length; @@ -208,7 +243,7 @@ public int readNBytes(byte[] b, int off, int len) throws IOException { Preconditions.checkState(!closed, "Cannot read: already closed"); positionStream(); - final int bytesRead = stream.readNBytes(b, off, len); + final int bytesRead = executeWithRetriesAndReturnInt(() -> stream.readNBytes(b, off, len)); if (bytesRead > 0) { pos += bytesRead; @@ -229,7 +264,7 @@ public byte[] readNBytes(int len) throws IOException { Preconditions.checkState(!closed, "Cannot read: already closed"); positionStream(); - final byte[] bytesRead = stream.readNBytes(len); + final byte[] bytesRead = executeWithRetriesAndReturnByteArray(() -> stream.readNBytes(len)); pos += bytesRead.length; next += bytesRead.length; @@ -332,7 +367,7 @@ public void readFully(byte[] bytes, int start, int len) throws IOException { Preconditions.checkState(!closed, "Cannot read: already closed"); positionStream(); - int bytesRead = readFully(stream, bytes, start, len); + final int bytesRead = executeWithRetriesAndReturnInt(() -> readFully(stream, bytes, start, len)); if (bytesRead > 0) { this.pos += bytesRead; @@ -360,9 +395,9 @@ public int read(ByteBuffer buf) throws IOException { int bytesRead = 0; if (buf.hasArray()) { - bytesRead = readHeapBuffer(stream, buf); + bytesRead = executeWithRetriesAndReturnInt(() -> readHeapBuffer(stream, buf)); } else { - bytesRead = readDirectBuffer(stream, buf, temp); + bytesRead = executeWithRetriesAndReturnInt(() -> readDirectBuffer(stream, buf, temp)); } if (bytesRead > 0) { @@ -393,9 +428,9 @@ public void readFully(ByteBuffer buf) throws IOException { int bytesRead = 0; if (buf.hasArray()) { - bytesRead = readFullyHeapBuffer(stream, buf); + bytesRead = executeWithRetriesAndReturnInt(() -> readFullyHeapBuffer(stream, buf)); } else { - bytesRead = readFullyDirectBuffer(stream, buf, temp); + bytesRead = executeWithRetriesAndReturnInt(() -> readFullyDirectBuffer(stream, buf, temp)); } if (bytesRead > 0) { @@ -612,9 +647,7 @@ static int readFullyDirectBuffer(InputStream f, ByteBuffer buf, byte[] temp) thr while (nextReadLength > 0 && (bytesRead = f.read(temp, 0, nextReadLength)) >= 0) { buf.put(temp, 0, bytesRead); nextReadLength = Math.min(buf.remaining(), temp.length); - if (bytesRead >= 0) { - totalBytesRead += bytesRead; - } + totalBytesRead += bytesRead; } if (bytesRead < 0 && buf.remaining() > 0) { @@ -632,4 +665,32 @@ private void recordS3Exception(final S3Exception ex) { s3ObjectPluginMetrics.getS3ObjectsFailedAccessDeniedCounter().increment(); } } + + private int executeWithRetriesAndReturnInt(CheckedSupplier supplier) throws IOException { + return executeWithRetries(retryPolicyReturningInteger, supplier); + } + + private byte[] executeWithRetriesAndReturnByteArray(CheckedSupplier supplier) throws IOException { + return executeWithRetries(retryPolicyReturningByteArray, supplier); + } + + + private T executeWithRetries(RetryPolicy retryPolicy, CheckedSupplier supplier) throws IOException { + try { + return Failsafe.with(retryPolicy).get(() -> { + try { + return supplier.get(); + } catch (ConnectionClosedException | EOFException | SocketException | SocketTimeoutException e) { + LOG.warn("Resetting stream due to underlying socket exception", e); + openStream(); + throw e; + } + }); + } catch (FailsafeException e) { + LOG.error("Failed to read with Retries", e); + throw new IOException(e.getCause()); + } + + } + } diff --git a/data-prepper-plugins/s3-source/src/test/java/org/opensearch/dataprepper/plugins/source/S3InputStreamTest.java b/data-prepper-plugins/s3-source/src/test/java/org/opensearch/dataprepper/plugins/source/S3InputStreamTest.java index d75254189d..12dd55988a 100644 --- a/data-prepper-plugins/s3-source/src/test/java/org/opensearch/dataprepper/plugins/source/S3InputStreamTest.java +++ b/data-prepper-plugins/s3-source/src/test/java/org/opensearch/dataprepper/plugins/source/S3InputStreamTest.java @@ -5,6 +5,8 @@ import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.MethodSource; import org.mockito.Mock; import org.mockito.junit.jupiter.MockitoExtension; import software.amazon.awssdk.core.sync.ResponseTransformer; @@ -15,21 +17,28 @@ import software.amazon.awssdk.services.s3.model.S3Exception; import java.io.ByteArrayInputStream; -import java.io.EOFException; import java.io.IOException; import java.io.InputStream; import java.nio.ByteBuffer; +import java.time.Duration; +import java.util.stream.Stream; import static org.junit.jupiter.api.Assertions.assertArrayEquals; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; @ExtendWith(MockitoExtension.class) class S3InputStreamTest { + + private static final Duration RETRY_DELAY = Duration.ofMillis(10); + + private static final int RETRIES = 3; + @Mock(lenient = true) private S3Client s3Client; @Mock(lenient = true) @@ -57,7 +66,8 @@ void setUp() { when(s3ObjectPluginMetrics.getS3ObjectsFailedNotFoundCounter()).thenReturn(s3ObjectsFailedNotFoundCounter); when(s3ObjectPluginMetrics.getS3ObjectsFailedAccessDeniedCounter()).thenReturn(s3ObjectsFailedAccessDeniedCounter); - s3InputStream = new S3InputStream(s3Client, s3ObjectReference, metadata, s3ObjectPluginMetrics); + s3InputStream = new S3InputStream( + s3Client, s3ObjectReference, metadata, s3ObjectPluginMetrics, RETRY_DELAY, RETRIES); } @Test @@ -117,6 +127,49 @@ void testReadEndOfFile() throws IOException { verify(s3ObjectSizeProcessedSummary).record(0.0); } + @ParameterizedTest + @MethodSource("retryableExceptions") + void testReadSucceedsAfterRetries(final Class retryableExceptionClass) throws IOException { + InputStream mockInputStream = mock(InputStream.class); + when(s3Client.getObject(any(GetObjectRequest.class), any(ResponseTransformer.class))) + .thenReturn(mockInputStream); + + when(mockInputStream.read()) + .thenThrow(retryableExceptionClass) + .thenReturn(1); + + int firstByte = s3InputStream.read(); + assertEquals(1, firstByte); + + s3InputStream.close(); + + verify(s3ObjectSizeProcessedSummary).record(1.0); + + verify(mockInputStream, times(2)).read(); + verify(mockInputStream, times(2)).close(); + verify(s3Client, times(2)) + .getObject(any(GetObjectRequest.class), any(ResponseTransformer.class)); + } + + @ParameterizedTest + @MethodSource("retryableExceptions") + void testReadFailsAfterRetries(final Class retryableExceptionClass) throws IOException { + InputStream mockInputStream = mock(InputStream.class); + when(s3Client.getObject(any(GetObjectRequest.class), any(ResponseTransformer.class))) + .thenReturn(mockInputStream); + + when(mockInputStream.read()).thenThrow(retryableExceptionClass); + + assertThrows(IOException.class, () -> s3InputStream.read()); + + s3InputStream.close(); + + verify(mockInputStream, times(RETRIES + 1)).read(); + verify(mockInputStream, times(RETRIES + 2)).close(); + verify(s3Client, times(RETRIES + 2)) + .getObject(any(GetObjectRequest.class), any(ResponseTransformer.class)); + } + @Test void testReadByteArray() throws IOException { InputStream inputStream = new ByteArrayInputStream("Test data".getBytes()); @@ -148,6 +201,53 @@ void testReadAllBytes() throws IOException { verify(s3ObjectSizeProcessedSummary).record(9.0); } + @ParameterizedTest + @MethodSource("retryableExceptions") + void testReadAllBytesSucceedsAfterRetries(final Class retryableExceptionClass) + throws IOException + { + InputStream mockInputStream = mock(InputStream.class); + when(s3Client.getObject(any(GetObjectRequest.class), any(ResponseTransformer.class))) + .thenReturn(mockInputStream); + + when(mockInputStream.readAllBytes()) + .thenThrow(retryableExceptionClass) + .thenReturn("Test data".getBytes()); + + final byte[] buffer = s3InputStream.readAllBytes(); + + assertArrayEquals("Test data".getBytes(), buffer); + + s3InputStream.close(); + + verify(s3ObjectSizeProcessedSummary).record(9.0); + verify(mockInputStream, times(2)).readAllBytes(); + verify(mockInputStream, times(2)).close(); + verify(s3Client, times(2)) + .getObject(any(GetObjectRequest.class), any(ResponseTransformer.class)); + } + + @ParameterizedTest + @MethodSource("retryableExceptions") + void testReadAllBytesFailsAfterRetries(final Class retryableExceptionClass) + throws IOException + { + InputStream mockInputStream = mock(InputStream.class); + when(s3Client.getObject(any(GetObjectRequest.class), any(ResponseTransformer.class))) + .thenReturn(mockInputStream); + + when(mockInputStream.readAllBytes()).thenThrow(retryableExceptionClass); + + assertThrows(IOException.class, () -> s3InputStream.readAllBytes()); + + s3InputStream.close(); + + verify(mockInputStream, times(RETRIES + 1)).readAllBytes(); + verify(mockInputStream, times(RETRIES + 2)).close(); + verify(s3Client, times(RETRIES + 2)) + .getObject(any(GetObjectRequest.class), any(ResponseTransformer.class)); + } + @Test void testReadNBytes_intoArray() throws Exception { InputStream inputStream = new ByteArrayInputStream("Test data".getBytes()); @@ -277,7 +377,7 @@ void testReadFullyByteBuffer_endOfFile() throws IOException { s3InputStream.seek(0); // Force opening the stream ByteBuffer buffer = ByteBuffer.allocate(4); - assertThrows(EOFException.class, () -> s3InputStream.readFully(buffer)); + assertThrows(IOException.class, () -> s3InputStream.readFully(buffer)); s3InputStream.close(); verify(s3ObjectSizeProcessedSummary).record(0.0); @@ -377,4 +477,7 @@ void testS3ObjectsFailedAccessDeniedCounter() { verify(s3ObjectsFailedAccessDeniedCounter).increment(); } + private static Stream> retryableExceptions() { + return S3InputStream.RETRYABLE_EXCEPTIONS.stream(); + } }