Skip to content

Commit

Permalink
Merge branch 'main' of https://github.com/shenkw1/data-prepper into w…
Browse files Browse the repository at this point in the history
…hitespace
  • Loading branch information
shenkw1 committed Jul 11, 2023
2 parents 1a656b8 + 4cba22d commit 00c2d74
Show file tree
Hide file tree
Showing 5 changed files with 205 additions and 32 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -91,8 +91,9 @@ private void parseParquetFile(final InputFile inputFile, final Consumer<Record<E

eventConsumer.accept(new Record<>(event));
}
} catch (Exception parquetException){
LOG.error("An exception occurred while parsing parquet InputStream ", parquetException);
} catch (Exception e){
LOG.error("An exception occurred while parsing parquet InputStream ", e);
throw new IOException(e);
}
}

Expand Down
2 changes: 2 additions & 0 deletions data-prepper-plugins/s3-source/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ dependencies {
implementation 'com.fasterxml.jackson.datatype:jackson-datatype-jsr310:2.15.2'
implementation 'org.xerial.snappy:snappy-java:1.1.10.1'
implementation 'org.apache.parquet:parquet-common:1.12.3'
implementation 'dev.failsafe:failsafe:3.3.2'
implementation 'org.apache.httpcomponents:httpcore:4.4.15'
testImplementation 'org.apache.commons:commons-lang3:3.12.0'
testImplementation 'com.github.tomakehurst:wiremock:3.0.0-beta-8'
testImplementation 'com.fasterxml.jackson.dataformat:jackson-dataformat-yaml'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,14 @@
import software.amazon.awssdk.services.s3.model.HeadObjectRequest;
import software.amazon.awssdk.services.s3.model.HeadObjectResponse;

import java.time.Duration;

public class S3InputFile implements InputFile {

private static final Duration DEFAULT_RETRY_DELAY = Duration.ofSeconds(6);

private static final int DEFAULT_RETRIES = 10;

private final S3Client s3Client;

private final S3ObjectReference s3ObjectReference;
Expand Down Expand Up @@ -42,8 +48,8 @@ public long getLength() {
*/
@Override
public SeekableInputStream newStream() {

return new S3InputStream(s3Client, s3ObjectReference, getMetadata(), s3ObjectPluginMetrics);
return new S3InputStream(
s3Client, s3ObjectReference, getMetadata(), s3ObjectPluginMetrics, DEFAULT_RETRY_DELAY, DEFAULT_RETRIES);
}

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -1,13 +1,12 @@
package org.opensearch.dataprepper.plugins.source;

import java.io.EOFException;
import java.io.IOException;
import java.io.InputStream;
import java.nio.ByteBuffer;
import java.util.concurrent.atomic.LongAdder;

import com.google.common.base.Preconditions;
import com.google.common.io.ByteStreams;
import dev.failsafe.Failsafe;
import dev.failsafe.FailsafeException;
import dev.failsafe.RetryPolicy;
import dev.failsafe.function.CheckedSupplier;
import org.apache.http.ConnectionClosedException;
import org.apache.parquet.io.SeekableInputStream;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -19,8 +18,25 @@
import software.amazon.awssdk.services.s3.model.HeadObjectResponse;
import software.amazon.awssdk.services.s3.model.S3Exception;

import java.io.EOFException;
import java.io.IOException;
import java.io.InputStream;
import java.net.SocketException;
import java.net.SocketTimeoutException;
import java.nio.ByteBuffer;
import java.time.Duration;
import java.util.List;
import java.util.concurrent.atomic.LongAdder;

class S3InputStream extends SeekableInputStream {

static final List<Class<? extends Throwable>> RETRYABLE_EXCEPTIONS = List.of(
ConnectionClosedException.class,
EOFException.class,
SocketException.class,
SocketTimeoutException.class
);

private static final int COPY_BUFFER_SIZE = 8192;

private static final Logger LOG = LoggerFactory.getLogger(S3InputStream.class);
Expand Down Expand Up @@ -52,11 +68,17 @@ class S3InputStream extends SeekableInputStream {

private boolean closed = false;

private RetryPolicy<byte[]> retryPolicyReturningByteArray;

private RetryPolicy<Integer> retryPolicyReturningInteger;

public S3InputStream(
final S3Client s3Client,
final S3ObjectReference s3ObjectReference,
final HeadObjectResponse metadata,
final S3ObjectPluginMetrics s3ObjectPluginMetrics
final S3Client s3Client,
final S3ObjectReference s3ObjectReference,
final HeadObjectResponse metadata,
final S3ObjectPluginMetrics s3ObjectPluginMetrics,
final Duration retryDelay,
final int retries
) {
this.s3Client = s3Client;
this.s3ObjectReference = s3ObjectReference;
Expand All @@ -65,10 +87,23 @@ public S3InputStream(
this.bytesCounter = new LongAdder();

this.getObjectRequestBuilder = GetObjectRequest.builder()
.bucket(this.s3ObjectReference.getBucketName())
.key(this.s3ObjectReference.getKey());
.bucket(this.s3ObjectReference.getBucketName())
.key(this.s3ObjectReference.getKey());

this.retryPolicyReturningByteArray = RetryPolicy.<byte[]>builder()
.handle(RETRYABLE_EXCEPTIONS)
.withDelay(retryDelay)
.withMaxRetries(retries)
.build();

this.retryPolicyReturningInteger = RetryPolicy.<Integer>builder()
.handle(RETRYABLE_EXCEPTIONS)
.withDelay(retryDelay)
.withMaxRetries(retries)
.build();
}


// Implement all InputStream methods first:

/**
Expand Down Expand Up @@ -129,7 +164,7 @@ public int read() throws IOException {
Preconditions.checkState(!closed, "Cannot read: already closed");
positionStream();

final int byteRead = stream.read();
final int byteRead = executeWithRetriesAndReturnInt(() -> stream.read());

if (byteRead != -1) {
pos += 1;
Expand Down Expand Up @@ -165,7 +200,7 @@ public int read(byte[] b, int off, int len) throws IOException {
Preconditions.checkState(!closed, "Cannot read: already closed");
positionStream();

final int bytesRead = stream.read(b, off, len);
final int bytesRead = executeWithRetriesAndReturnInt(() -> stream.read(b, off, len));

if (bytesRead > 0) {
pos += bytesRead;
Expand All @@ -186,7 +221,7 @@ public byte[] readAllBytes() throws IOException {
Preconditions.checkState(!closed, "Cannot read: already closed");
positionStream();

final byte[] bytesRead = stream.readAllBytes();
final byte[] bytesRead = executeWithRetriesAndReturnByteArray(() -> stream.readAllBytes());

pos += bytesRead.length;
next += bytesRead.length;
Expand All @@ -208,7 +243,7 @@ public int readNBytes(byte[] b, int off, int len) throws IOException {
Preconditions.checkState(!closed, "Cannot read: already closed");
positionStream();

final int bytesRead = stream.readNBytes(b, off, len);
final int bytesRead = executeWithRetriesAndReturnInt(() -> stream.readNBytes(b, off, len));

if (bytesRead > 0) {
pos += bytesRead;
Expand All @@ -229,7 +264,7 @@ public byte[] readNBytes(int len) throws IOException {
Preconditions.checkState(!closed, "Cannot read: already closed");
positionStream();

final byte[] bytesRead = stream.readNBytes(len);
final byte[] bytesRead = executeWithRetriesAndReturnByteArray(() -> stream.readNBytes(len));

pos += bytesRead.length;
next += bytesRead.length;
Expand Down Expand Up @@ -332,7 +367,7 @@ public void readFully(byte[] bytes, int start, int len) throws IOException {
Preconditions.checkState(!closed, "Cannot read: already closed");
positionStream();

int bytesRead = readFully(stream, bytes, start, len);
final int bytesRead = executeWithRetriesAndReturnInt(() -> readFully(stream, bytes, start, len));

if (bytesRead > 0) {
this.pos += bytesRead;
Expand Down Expand Up @@ -360,9 +395,9 @@ public int read(ByteBuffer buf) throws IOException {

int bytesRead = 0;
if (buf.hasArray()) {
bytesRead = readHeapBuffer(stream, buf);
bytesRead = executeWithRetriesAndReturnInt(() -> readHeapBuffer(stream, buf));
} else {
bytesRead = readDirectBuffer(stream, buf, temp);
bytesRead = executeWithRetriesAndReturnInt(() -> readDirectBuffer(stream, buf, temp));
}

if (bytesRead > 0) {
Expand Down Expand Up @@ -393,9 +428,9 @@ public void readFully(ByteBuffer buf) throws IOException {

int bytesRead = 0;
if (buf.hasArray()) {
bytesRead = readFullyHeapBuffer(stream, buf);
bytesRead = executeWithRetriesAndReturnInt(() -> readFullyHeapBuffer(stream, buf));
} else {
bytesRead = readFullyDirectBuffer(stream, buf, temp);
bytesRead = executeWithRetriesAndReturnInt(() -> readFullyDirectBuffer(stream, buf, temp));
}

if (bytesRead > 0) {
Expand Down Expand Up @@ -612,9 +647,7 @@ static int readFullyDirectBuffer(InputStream f, ByteBuffer buf, byte[] temp) thr
while (nextReadLength > 0 && (bytesRead = f.read(temp, 0, nextReadLength)) >= 0) {
buf.put(temp, 0, bytesRead);
nextReadLength = Math.min(buf.remaining(), temp.length);
if (bytesRead >= 0) {
totalBytesRead += bytesRead;
}
totalBytesRead += bytesRead;
}

if (bytesRead < 0 && buf.remaining() > 0) {
Expand All @@ -632,4 +665,32 @@ private void recordS3Exception(final S3Exception ex) {
s3ObjectPluginMetrics.getS3ObjectsFailedAccessDeniedCounter().increment();
}
}

private int executeWithRetriesAndReturnInt(CheckedSupplier<Integer> supplier) throws IOException {
return executeWithRetries(retryPolicyReturningInteger, supplier);
}

private byte[] executeWithRetriesAndReturnByteArray(CheckedSupplier<byte[]> supplier) throws IOException {
return executeWithRetries(retryPolicyReturningByteArray, supplier);
}


private <T> T executeWithRetries(RetryPolicy<T> retryPolicy, CheckedSupplier<T> supplier) throws IOException {
try {
return Failsafe.with(retryPolicy).get(() -> {
try {
return supplier.get();
} catch (ConnectionClosedException | EOFException | SocketException | SocketTimeoutException e) {
LOG.warn("Resetting stream due to underlying socket exception", e);
openStream();
throw e;
}
});
} catch (FailsafeException e) {
LOG.error("Failed to read with Retries", e);
throw new IOException(e.getCause());
}

}

}
Loading

0 comments on commit 00c2d74

Please sign in to comment.