Skip to content

Commit

Permalink
S3 scan enhancements (opensearch-project#3049)
Browse files Browse the repository at this point in the history
* S3 scan enhancements

Signed-off-by: Asif Sohail Mohammed <[email protected]>

---------

Signed-off-by: Asif Sohail Mohammed <[email protected]>
  • Loading branch information
asifsmohammed authored Aug 7, 2023
1 parent 0582788 commit 5262eea
Show file tree
Hide file tree
Showing 26 changed files with 764 additions and 70 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,14 @@ public class SourcePartition<T> {

private final String partitionKey;
private final T partitionState;
private final Long partitionClosedCount;

private SourcePartition(final Builder<T> builder) {
Objects.requireNonNull(builder.partitionKey);

this.partitionKey = builder.partitionKey;
this.partitionState = builder.partitionState;
this.partitionClosedCount = builder.partitionClosedCount;
}

public String getPartitionKey() {
Expand All @@ -34,6 +36,10 @@ public Optional<T> getPartitionState() {
return Optional.ofNullable(partitionState);
}

public Long getPartitionClosedCount() {
return partitionClosedCount;
}

public static <T> Builder<T> builder(Class<T> clazz) {
return new Builder<>(clazz);
}
Expand All @@ -42,6 +48,7 @@ public static class Builder<T> {

private String partitionKey;
private T partitionState;
private Long partitionClosedCount;

public Builder(Class<T> clazz) {

Expand All @@ -57,6 +64,11 @@ public Builder<T> withPartitionState(final T partitionState) {
return this;
}

public Builder<T> withPartitionClosedCount(final Long partitionClosedCount) {
this.partitionClosedCount = partitionClosedCount;
return this;
}

public SourcePartition<T> build() {
return new SourcePartition<T>(this);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

import org.junit.jupiter.api.Test;

import java.util.Random;
import java.util.UUID;

import static org.hamcrest.MatcherAssert.assertThat;
Expand All @@ -31,15 +32,18 @@ void sourcePartitionBuilderWithNullPartitionThrowsNullPointerException() {
void sourcePartitionBuilder_returns_expected_SourcePartition() {
final String partitionKey = UUID.randomUUID().toString();
final String partitionState = UUID.randomUUID().toString();
final Long partitionClosedCount = new Random().nextLong();

final SourcePartition<String> sourcePartition = SourcePartition.builder(String.class)
.withPartitionKey(partitionKey)
.withPartitionState(partitionState)
.withPartitionClosedCount(partitionClosedCount)
.build();

assertThat(sourcePartition, notNullValue());
assertThat(sourcePartition.getPartitionKey(), equalTo(partitionKey));
assertThat(sourcePartition.getPartitionState().isPresent(), equalTo(true));
assertThat(sourcePartition.getPartitionState().get(), equalTo(partitionState));
assertThat(sourcePartition.getPartitionClosedCount(), equalTo(partitionClosedCount));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,7 @@ public Optional<SourcePartition<T>> getNextPartition(final Function<Map<String,
final SourcePartition<T> sourcePartition = SourcePartition.builder(partitionProgressStateClass)
.withPartitionKey(ownedPartitions.get().getSourcePartitionKey())
.withPartitionState(convertStringToPartitionProgressStateClass(ownedPartitions.get().getPartitionProgressState()))
.withPartitionClosedCount(ownedPartitions.get().getClosedCount())
.build();

partitionManager.setActivePartition(sourcePartition);
Expand Down
28 changes: 24 additions & 4 deletions data-prepper-plugins/s3-source/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,8 @@ All Duration values are a string that represents a duration. They support ISO_86

* `disable_bucket_ownership_validation` (Optional) : Boolean - If set to true, then the S3 Source will not attempt to validate that the bucket is owned by the expected account. The only expected account is the same account which owns the SQS queue. Defaults to `false`.

* `delete_on_read` (Optional) : Boolean - If set to true, then the S3 Source will attempt to delete S3 objects after all the events from the S3 object are successfully acknowledged by all sinks. `acknowledgments` should be enabled for deleting S3 objects. Defaults to `false`.

### <a name="s3_select_configuration">S3 Select Configuration</a>

* `expression` (Required if s3_select enabled) : Provide s3 select query to process the data using S3 select for the particular bucket.
Expand Down Expand Up @@ -151,6 +153,7 @@ All Duration values are a string that represents a duration. They support ISO_86
* `start_time` (Optional) : Provide the start time to scan objects from all the buckets. This parameter defines a time range together with either end_time or range. Example: `2023-01-23T10:00:00`.
* `end_time` (Optional) : Provide the end time to scan objects from all the buckets. This parameter defines a time range together with either start_time or range. Example: `2023-01-23T10:00:00`.
* `range` (Optional) : Provide the duration to scan objects from all the buckets. This parameter defines a time range together with either start_time or end_time.
* `scheduling` (Optional): See [Scheduling Configuration](#scheduling_configuration) for details
* `bucket`: Provide S3 bucket information
* `name` (Required if bucket block is used): Provide S3 bucket name.
* `key_prefix` (Optional) : Provide include and exclude the list items.
Expand All @@ -162,6 +165,18 @@ All Duration values are a string that represents a duration. They support ISO_86

> Note: If a time range is not specified, all objects will be included by default. To set a time range, specify any two and only two configurations from start_time, end_time and range. The time range configured on a specific bucket will override the time range specified on the top level
### <a name="scheduling_configuration">Scheduling Configuration</a>

Schedule frequency and amount of times an object should be processed when using S3 Scan. For example,
a `rate` of `PT1H` and a `job_count` of 3 would result in each object getting processed 3 times, starting after source is ready
and then every hour after the first time the object is processed.

* `rate` (Optional) : A String that indicates the rate to process an S3 object based on the `job_count`.
Supports ISO_8601 notation Strings ("PT20.345S", "PT15M", etc.) as well as simple notation Strings for seconds ("60s") and milliseconds ("1500ms").
Defaults to 8 hours, and is only applicable when `job_count` is greater than 1.
* `job_count` (Optional) : An Integer that specifies how many times each S3 object should be processed. Defaults to 1.


### <a name="aws_configuration">AWS Configuration</a>

The AWS configuration is the same for both SQS and S3.
Expand All @@ -179,10 +194,12 @@ The following policy shows the necessary permissions for S3 source. `kms:Decrypt
"Sid": "s3policy",
"Effect": "Allow",
"Action": [
"s3:GetObject",
"sqs:DeleteMessage",
"sqs:ReceiveMessage",
"kms:Decrypt"
"s3:GetObject",
"s3:ListBucket",
"s3:DeleteObject",
"sqs:DeleteMessage",
"sqs:ReceiveMessage",
"kms:Decrypt"
],
"Resource": "*"
}
Expand All @@ -204,6 +221,9 @@ The following policy shows the necessary permissions for S3 source. `kms:Decrypt
* `sqsMessagesDeleted` - The number of SQS messages deleted from the queue by the S3 Source.
* `sqsMessagesFailed` - The number of SQS messages that the S3 Source failed to parse.
* `sqsMessagesDeleteFailed` - The number of SQS messages that the S3 Source failed to delete from the SQS queue.
* `s3ObjectsDeleted` - The number of S3 objects deleted by the S3 source.
* `s3ObjectsDeleteFailed` - The number of S3 objects that the S3 source failed to delete.
* `acknowledgementSetCallbackCounter` - The number of times End-to-end acknowledgments created an acknowledgment set.


### Timers
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ private String getKeyString(final RecordsGenerator recordsGenerator, final int n

private void parseObject(final String key, final S3ObjectWorker objectUnderTest) throws IOException {
final S3ObjectReference s3ObjectReference = S3ObjectReference.bucketAndKey(bucket, key).build();
objectUnderTest.parseS3Object(s3ObjectReference, null);
objectUnderTest.parseS3Object(s3ObjectReference, null, null, null);
}

static class IntegrationTestArguments implements ArgumentsProvider {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,19 @@
import io.micrometer.core.instrument.Timer;
import io.micrometer.core.instrument.noop.NoopTimer;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.api.extension.ExtensionContext;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.ArgumentsProvider;
import org.junit.jupiter.params.provider.ArgumentsSource;
import org.junit.jupiter.params.provider.CsvSource;
import org.junit.jupiter.params.provider.ValueSource;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
import org.opensearch.dataprepper.acknowledgements.DefaultAcknowledgementSetManager;
import org.opensearch.dataprepper.metrics.PluginMetrics;
import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSetManager;
import org.opensearch.dataprepper.model.buffer.Buffer;
import org.opensearch.dataprepper.model.configuration.PluginModel;
import org.opensearch.dataprepper.model.configuration.PluginSetting;
Expand All @@ -28,6 +34,8 @@
import org.opensearch.dataprepper.parser.model.SourceCoordinationConfig;
import org.opensearch.dataprepper.plugins.codec.CompressionOption;
import org.opensearch.dataprepper.plugins.source.configuration.S3ScanBucketOption;
import org.opensearch.dataprepper.plugins.source.configuration.S3ScanScanOptions;
import org.opensearch.dataprepper.plugins.source.configuration.S3ScanSchedulingOptions;
import org.opensearch.dataprepper.plugins.source.configuration.S3SelectCSVOption;
import org.opensearch.dataprepper.plugins.source.configuration.S3SelectJsonOption;
import org.opensearch.dataprepper.plugins.source.configuration.S3SelectSerializationFormatOption;
Expand Down Expand Up @@ -63,12 +71,18 @@
import static org.mockito.ArgumentMatchers.anyCollection;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.lenient;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoInteractions;
import static org.mockito.Mockito.verifyNoMoreInteractions;
import static org.mockito.Mockito.when;
import static org.opensearch.dataprepper.plugins.source.S3ObjectDeleteWorker.S3_OBJECTS_DELETED_METRIC_NAME;
import static org.opensearch.dataprepper.plugins.source.S3ObjectDeleteWorker.S3_OBJECTS_DELETE_FAILED_METRIC_NAME;
import static org.opensearch.dataprepper.plugins.source.ScanObjectWorker.ACKNOWLEDGEMENT_SET_CALLBACK_METRIC_NAME;

@ExtendWith(MockitoExtension.class)
public class S3ScanObjectWorkerIT {

private static final int TIMEOUT_IN_MILLIS = 200;
Expand All @@ -84,6 +98,21 @@ public class S3ScanObjectWorkerIT {
private final ObjectMapper objectMapper = new ObjectMapper(new YAMLFactory().enable(YAMLGenerator.Feature.USE_PLATFORM_LINE_BREAKS));

private SourceCoordinator<S3SourceProgressState> sourceCoordinator;
@Mock
private S3SourceConfig s3SourceConfig;
@Mock
private AcknowledgementSetManager acknowledgementSetManager;
@Mock
private PluginMetrics pluginMetrics;
@Mock
private S3ScanScanOptions s3ScanScanOptions;
@Mock
private S3ScanSchedulingOptions s3ScanSchedulingOptions;

final Counter acknowledgementCounter = mock(Counter.class);
final Counter s3DeletedCounter = mock(Counter.class);
final Counter s3DeleteFailedCounter = mock(Counter.class);


private S3ObjectHandler createObjectUnderTest(final S3ObjectRequest s3ObjectRequest){
if(Objects.nonNull(s3ObjectRequest.getExpression()))
Expand Down Expand Up @@ -111,14 +140,14 @@ void setUp() {
final Counter counter = mock(Counter.class);
final DistributionSummary distributionSummary = mock(DistributionSummary.class);
final Timer timer = new NoopTimer(new Meter.Id("test", Tags.empty(), null, null, Meter.Type.TIMER));
when(s3ObjectPluginMetrics.getS3ObjectsFailedCounter()).thenReturn(counter);
when(s3ObjectPluginMetrics.getS3ObjectsSucceededCounter()).thenReturn(counter);
when(s3ObjectPluginMetrics.getS3ObjectsFailedAccessDeniedCounter()).thenReturn(counter);
when(s3ObjectPluginMetrics.getS3ObjectsFailedNotFoundCounter()).thenReturn(counter);
when(s3ObjectPluginMetrics.getS3ObjectSizeSummary()).thenReturn(distributionSummary);
when(s3ObjectPluginMetrics.getS3ObjectEventsSummary()).thenReturn(distributionSummary);
when(s3ObjectPluginMetrics.getS3ObjectSizeProcessedSummary()).thenReturn(distributionSummary);
when(s3ObjectPluginMetrics.getS3ObjectReadTimer()).thenReturn(timer);
lenient().when(s3ObjectPluginMetrics.getS3ObjectsFailedCounter()).thenReturn(counter);
lenient().when(s3ObjectPluginMetrics.getS3ObjectsSucceededCounter()).thenReturn(counter);
lenient().when(s3ObjectPluginMetrics.getS3ObjectsFailedAccessDeniedCounter()).thenReturn(counter);
lenient().when(s3ObjectPluginMetrics.getS3ObjectsFailedNotFoundCounter()).thenReturn(counter);
lenient().when(s3ObjectPluginMetrics.getS3ObjectSizeSummary()).thenReturn(distributionSummary);
lenient().when(s3ObjectPluginMetrics.getS3ObjectEventsSummary()).thenReturn(distributionSummary);
lenient().when(s3ObjectPluginMetrics.getS3ObjectSizeProcessedSummary()).thenReturn(distributionSummary);
lenient().when(s3ObjectPluginMetrics.getS3ObjectReadTimer()).thenReturn(timer);
bucketOwnerProvider = b -> Optional.empty();

final SourceCoordinationStore inMemoryStore = new InMemorySourceCoordinationStore(new PluginSetting("in_memory", Collections.emptyMap()));
Expand All @@ -129,7 +158,7 @@ void setUp() {
}

private void stubBufferWriter(final Consumer<Event> additionalEventAssertions, final String key) throws Exception {
doAnswer(a -> {
lenient().doAnswer(a -> {
final Collection<Record<Event>> recordsCollection = a.getArgument(0);
assertThat(recordsCollection.size(), greaterThanOrEqualTo(1));
for (Record<Event> eventRecord : recordsCollection) {
Expand Down Expand Up @@ -169,8 +198,22 @@ private ScanObjectWorker createObjectUnderTest(final RecordsGenerator recordsGen
.codec(recordsGenerator.getCodec())
.compressionType(shouldCompress ? CompressionType.GZIP : CompressionType.NONE)
.s3SelectResponseHandlerFactory(new S3SelectResponseHandlerFactory()).build();

when(pluginMetrics.counter(ACKNOWLEDGEMENT_SET_CALLBACK_METRIC_NAME)).thenReturn(acknowledgementCounter);
when(pluginMetrics.counter(S3_OBJECTS_DELETED_METRIC_NAME)).thenReturn(s3DeletedCounter);
when(pluginMetrics.counter(S3_OBJECTS_DELETE_FAILED_METRIC_NAME)).thenReturn(s3DeleteFailedCounter);
S3ObjectDeleteWorker s3ObjectDeleteWorker = new S3ObjectDeleteWorker(s3Client, pluginMetrics);

when(s3SourceConfig.getS3ScanScanOptions()).thenReturn(s3ScanScanOptions);
when(s3ScanScanOptions.getSchedulingOptions()).thenReturn(s3ScanSchedulingOptions);
lenient().when(s3ScanSchedulingOptions.getRate()).thenReturn(Duration.ofHours(1));
lenient().when(s3ScanSchedulingOptions.getCount()).thenReturn(1);

ExecutorService executor = Executors.newFixedThreadPool(2);
acknowledgementSetManager = new DefaultAcknowledgementSetManager(executor);

return new ScanObjectWorker(s3Client,List.of(scanOptions),createObjectUnderTest(s3ObjectRequest)
,bucketOwnerProvider, sourceCoordinator);
,bucketOwnerProvider, sourceCoordinator, s3SourceConfig, acknowledgementSetManager, s3ObjectDeleteWorker, pluginMetrics);
}

@ParameterizedTest
Expand Down Expand Up @@ -267,6 +310,62 @@ void parseS3Object_correctly_with_bucket_scan_and_loads_data_into_Buffer(
executorService.shutdownNow();
}

@ParameterizedTest
@ValueSource(strings = {"true", "false"})
void parseS3Object_correctly_with_bucket_scan_and_loads_data_into_Buffer_and_deletes_s3_object(final boolean deleteS3Objects) throws Exception {
final RecordsGenerator recordsGenerator = new NewlineDelimitedRecordsGenerator();
final boolean shouldCompress = true;
final int numberOfRecords = 100;
final int numberOfRecordsToAccumulate = 50;

when(s3SourceConfig.isDeleteS3ObjectsOnRead()).thenReturn(deleteS3Objects);
String keyPrefix = "s3source/s3-scan/" + recordsGenerator.getFileExtension() + "/" + Instant.now().toEpochMilli();
final String key = getKeyString(keyPrefix,recordsGenerator, shouldCompress);
final String buketOptionYaml = "name: " + bucket + "\n" +
"key_prefix:\n" +
" include:\n" +
" - " + keyPrefix;

final ScanOptions.Builder startTimeAndEndTimeScanOptions = ScanOptions.builder()
.setStartDateTime(LocalDateTime.now().minus(Duration.ofMinutes(10)))
.setEndDateTime(LocalDateTime.now().plus(Duration.ofHours(1)));

List<ScanOptions.Builder> scanOptions = List.of(startTimeAndEndTimeScanOptions);
final ScanOptions.Builder s3ScanOptionsBuilder = scanOptions.get(0).setBucketOption(objectMapper.readValue(buketOptionYaml, S3ScanBucketOption.class));

s3ObjectGenerator.write(numberOfRecords, key, recordsGenerator, shouldCompress);
stubBufferWriter(recordsGenerator::assertEventIsCorrect, key);

final ScanObjectWorker scanObjectWorker = createObjectUnderTest(recordsGenerator,
numberOfRecordsToAccumulate,
recordsGenerator.getS3SelectExpression(),
shouldCompress,
s3ScanOptionsBuilder.build(),
ThreadLocalRandom.current().nextBoolean());


final int expectedWrites = numberOfRecords / numberOfRecordsToAccumulate;

final ExecutorService executorService = Executors.newSingleThreadExecutor();
executorService.submit(scanObjectWorker::run);


await().atMost(Duration.ofSeconds(60)).until(() -> waitForAllRecordsToBeProcessed(numberOfRecords));

verify(buffer, times(expectedWrites)).writeAll(anyCollection(), eq(TIMEOUT_IN_MILLIS));
assertThat(recordsReceived, equalTo(numberOfRecords));

// wait for S3 objects to be deleted to verify metrics
Thread.sleep(500);

if (deleteS3Objects)
verify(s3DeletedCounter, times(1)).increment();
verifyNoMoreInteractions(s3DeletedCounter);
verifyNoInteractions(s3DeleteFailedCounter);

executorService.shutdownNow();
}

private String getKeyString(final String keyPrefix ,
final RecordsGenerator recordsGenerator,
final boolean shouldCompress) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,7 @@ private String getKeyString(final RecordsGenerator recordsGenerator, final int n

private void parseObject(final String key, final S3SelectObjectWorker objectUnderTest) throws IOException {
final S3ObjectReference s3ObjectReference = S3ObjectReference.bucketAndKey(bucket, key).build();
objectUnderTest.parseS3Object(s3ObjectReference,null);
objectUnderTest.parseS3Object(s3ObjectReference,null, null, null);
}

static class IntegrationTestArguments implements ArgumentsProvider {
Expand Down
Loading

0 comments on commit 5262eea

Please sign in to comment.