Skip to content

Commit

Permalink
Retry s3 reads on socket exceptions. (opensearch-project#2992) (opens…
Browse files Browse the repository at this point in the history
…earch-project#3008)

* Retry s3 reads on socket exceptions.

S3 will reset the conenction on their end frequently. To not lose data,
data prepper should retry all socket exceptions by attempting to re-open
the stream.

Signed-off-by: Adi Suresh <[email protected]>

* Bubble up parquet exceptions.

Signed-off-by: Adi Suresh <[email protected]>

---------

Signed-off-by: Adi Suresh <[email protected]>
(cherry picked from commit 9f78542)

Co-authored-by: Adi Suresh <[email protected]>
  • Loading branch information
opensearch-trigger-bot[bot] and asuresh8 committed Jul 11, 2023
1 parent 8e0772d commit 62933f6
Show file tree
Hide file tree
Showing 5 changed files with 205 additions and 32 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -91,8 +91,9 @@ private void parseParquetFile(final InputFile inputFile, final Consumer<Record<E

eventConsumer.accept(new Record<>(event));
}
} catch (Exception parquetException){
LOG.error("An exception occurred while parsing parquet InputStream ", parquetException);
} catch (Exception e){
LOG.error("An exception occurred while parsing parquet InputStream ", e);
throw new IOException(e);
}
}

Expand Down
2 changes: 2 additions & 0 deletions data-prepper-plugins/s3-source/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ dependencies {
implementation 'com.fasterxml.jackson.datatype:jackson-datatype-jsr310:2.14.2'
implementation 'org.xerial.snappy:snappy-java:1.1.10.1'
implementation 'org.apache.parquet:parquet-common:1.12.3'
implementation 'dev.failsafe:failsafe:3.3.2'
implementation 'org.apache.httpcomponents:httpcore:4.4.15'
testImplementation 'org.apache.commons:commons-lang3:3.12.0'
testImplementation 'com.github.tomakehurst:wiremock:3.0.0-beta-8'
testImplementation 'com.fasterxml.jackson.dataformat:jackson-dataformat-yaml'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,14 @@
import software.amazon.awssdk.services.s3.model.HeadObjectRequest;
import software.amazon.awssdk.services.s3.model.HeadObjectResponse;

import java.time.Duration;

public class S3InputFile implements InputFile {

private static final Duration DEFAULT_RETRY_DELAY = Duration.ofSeconds(6);

private static final int DEFAULT_RETRIES = 10;

private final S3Client s3Client;

private final S3ObjectReference s3ObjectReference;
Expand Down Expand Up @@ -42,8 +48,8 @@ public long getLength() {
*/
@Override
public SeekableInputStream newStream() {

return new S3InputStream(s3Client, s3ObjectReference, getMetadata(), s3ObjectPluginMetrics);
return new S3InputStream(
s3Client, s3ObjectReference, getMetadata(), s3ObjectPluginMetrics, DEFAULT_RETRY_DELAY, DEFAULT_RETRIES);
}

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -1,13 +1,12 @@
package org.opensearch.dataprepper.plugins.source;

import java.io.EOFException;
import java.io.IOException;
import java.io.InputStream;
import java.nio.ByteBuffer;
import java.util.concurrent.atomic.LongAdder;

import com.google.common.base.Preconditions;
import com.google.common.io.ByteStreams;
import dev.failsafe.Failsafe;
import dev.failsafe.FailsafeException;
import dev.failsafe.RetryPolicy;
import dev.failsafe.function.CheckedSupplier;
import org.apache.http.ConnectionClosedException;
import org.apache.parquet.io.SeekableInputStream;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -19,8 +18,25 @@
import software.amazon.awssdk.services.s3.model.HeadObjectResponse;
import software.amazon.awssdk.services.s3.model.S3Exception;

import java.io.EOFException;
import java.io.IOException;
import java.io.InputStream;
import java.net.SocketException;
import java.net.SocketTimeoutException;
import java.nio.ByteBuffer;
import java.time.Duration;
import java.util.List;
import java.util.concurrent.atomic.LongAdder;

class S3InputStream extends SeekableInputStream {

static final List<Class<? extends Throwable>> RETRYABLE_EXCEPTIONS = List.of(
ConnectionClosedException.class,
EOFException.class,
SocketException.class,
SocketTimeoutException.class
);

private static final int COPY_BUFFER_SIZE = 8192;

private static final Logger LOG = LoggerFactory.getLogger(S3InputStream.class);
Expand Down Expand Up @@ -52,11 +68,17 @@ class S3InputStream extends SeekableInputStream {

private boolean closed = false;

private RetryPolicy<byte[]> retryPolicyReturningByteArray;

private RetryPolicy<Integer> retryPolicyReturningInteger;

public S3InputStream(
final S3Client s3Client,
final S3ObjectReference s3ObjectReference,
final HeadObjectResponse metadata,
final S3ObjectPluginMetrics s3ObjectPluginMetrics
final S3Client s3Client,
final S3ObjectReference s3ObjectReference,
final HeadObjectResponse metadata,
final S3ObjectPluginMetrics s3ObjectPluginMetrics,
final Duration retryDelay,
final int retries
) {
this.s3Client = s3Client;
this.s3ObjectReference = s3ObjectReference;
Expand All @@ -65,10 +87,23 @@ public S3InputStream(
this.bytesCounter = new LongAdder();

this.getObjectRequestBuilder = GetObjectRequest.builder()
.bucket(this.s3ObjectReference.getBucketName())
.key(this.s3ObjectReference.getKey());
.bucket(this.s3ObjectReference.getBucketName())
.key(this.s3ObjectReference.getKey());

this.retryPolicyReturningByteArray = RetryPolicy.<byte[]>builder()
.handle(RETRYABLE_EXCEPTIONS)
.withDelay(retryDelay)
.withMaxRetries(retries)
.build();

this.retryPolicyReturningInteger = RetryPolicy.<Integer>builder()
.handle(RETRYABLE_EXCEPTIONS)
.withDelay(retryDelay)
.withMaxRetries(retries)
.build();
}


// Implement all InputStream methods first:

/**
Expand Down Expand Up @@ -129,7 +164,7 @@ public int read() throws IOException {
Preconditions.checkState(!closed, "Cannot read: already closed");
positionStream();

final int byteRead = stream.read();
final int byteRead = executeWithRetriesAndReturnInt(() -> stream.read());

if (byteRead != -1) {
pos += 1;
Expand Down Expand Up @@ -165,7 +200,7 @@ public int read(byte[] b, int off, int len) throws IOException {
Preconditions.checkState(!closed, "Cannot read: already closed");
positionStream();

final int bytesRead = stream.read(b, off, len);
final int bytesRead = executeWithRetriesAndReturnInt(() -> stream.read(b, off, len));

if (bytesRead > 0) {
pos += bytesRead;
Expand All @@ -186,7 +221,7 @@ public byte[] readAllBytes() throws IOException {
Preconditions.checkState(!closed, "Cannot read: already closed");
positionStream();

final byte[] bytesRead = stream.readAllBytes();
final byte[] bytesRead = executeWithRetriesAndReturnByteArray(() -> stream.readAllBytes());

pos += bytesRead.length;
next += bytesRead.length;
Expand All @@ -208,7 +243,7 @@ public int readNBytes(byte[] b, int off, int len) throws IOException {
Preconditions.checkState(!closed, "Cannot read: already closed");
positionStream();

final int bytesRead = stream.readNBytes(b, off, len);
final int bytesRead = executeWithRetriesAndReturnInt(() -> stream.readNBytes(b, off, len));

if (bytesRead > 0) {
pos += bytesRead;
Expand All @@ -229,7 +264,7 @@ public byte[] readNBytes(int len) throws IOException {
Preconditions.checkState(!closed, "Cannot read: already closed");
positionStream();

final byte[] bytesRead = stream.readNBytes(len);
final byte[] bytesRead = executeWithRetriesAndReturnByteArray(() -> stream.readNBytes(len));

pos += bytesRead.length;
next += bytesRead.length;
Expand Down Expand Up @@ -332,7 +367,7 @@ public void readFully(byte[] bytes, int start, int len) throws IOException {
Preconditions.checkState(!closed, "Cannot read: already closed");
positionStream();

int bytesRead = readFully(stream, bytes, start, len);
final int bytesRead = executeWithRetriesAndReturnInt(() -> readFully(stream, bytes, start, len));

if (bytesRead > 0) {
this.pos += bytesRead;
Expand Down Expand Up @@ -360,9 +395,9 @@ public int read(ByteBuffer buf) throws IOException {

int bytesRead = 0;
if (buf.hasArray()) {
bytesRead = readHeapBuffer(stream, buf);
bytesRead = executeWithRetriesAndReturnInt(() -> readHeapBuffer(stream, buf));
} else {
bytesRead = readDirectBuffer(stream, buf, temp);
bytesRead = executeWithRetriesAndReturnInt(() -> readDirectBuffer(stream, buf, temp));
}

if (bytesRead > 0) {
Expand Down Expand Up @@ -393,9 +428,9 @@ public void readFully(ByteBuffer buf) throws IOException {

int bytesRead = 0;
if (buf.hasArray()) {
bytesRead = readFullyHeapBuffer(stream, buf);
bytesRead = executeWithRetriesAndReturnInt(() -> readFullyHeapBuffer(stream, buf));
} else {
bytesRead = readFullyDirectBuffer(stream, buf, temp);
bytesRead = executeWithRetriesAndReturnInt(() -> readFullyDirectBuffer(stream, buf, temp));
}

if (bytesRead > 0) {
Expand Down Expand Up @@ -612,9 +647,7 @@ static int readFullyDirectBuffer(InputStream f, ByteBuffer buf, byte[] temp) thr
while (nextReadLength > 0 && (bytesRead = f.read(temp, 0, nextReadLength)) >= 0) {
buf.put(temp, 0, bytesRead);
nextReadLength = Math.min(buf.remaining(), temp.length);
if (bytesRead >= 0) {
totalBytesRead += bytesRead;
}
totalBytesRead += bytesRead;
}

if (bytesRead < 0 && buf.remaining() > 0) {
Expand All @@ -632,4 +665,32 @@ private void recordS3Exception(final S3Exception ex) {
s3ObjectPluginMetrics.getS3ObjectsFailedAccessDeniedCounter().increment();
}
}

private int executeWithRetriesAndReturnInt(CheckedSupplier<Integer> supplier) throws IOException {
return executeWithRetries(retryPolicyReturningInteger, supplier);
}

private byte[] executeWithRetriesAndReturnByteArray(CheckedSupplier<byte[]> supplier) throws IOException {
return executeWithRetries(retryPolicyReturningByteArray, supplier);
}


private <T> T executeWithRetries(RetryPolicy<T> retryPolicy, CheckedSupplier<T> supplier) throws IOException {
try {
return Failsafe.with(retryPolicy).get(() -> {
try {
return supplier.get();
} catch (ConnectionClosedException | EOFException | SocketException | SocketTimeoutException e) {
LOG.warn("Resetting stream due to underlying socket exception", e);
openStream();
throw e;
}
});
} catch (FailsafeException e) {
LOG.error("Failed to read with Retries", e);
throw new IOException(e.getCause());
}

}

}
Loading

0 comments on commit 62933f6

Please sign in to comment.