Skip to content

Commit

Permalink
S3 Scan time range improvements (opensearch-project#2883)
Browse files Browse the repository at this point in the history
* When no time range set, default to scan all objects; allow setting time range for specific bucket

Signed-off-by: Hai Yan <[email protected]>

---------

Signed-off-by: Hai Yan <[email protected]>
  • Loading branch information
oeyh committed Jun 26, 2023
1 parent cb5cb60 commit b180634
Show file tree
Hide file tree
Showing 11 changed files with 477 additions and 127 deletions.
78 changes: 42 additions & 36 deletions data-prepper-plugins/s3-source/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,8 @@ source-pipeline:
s3:
notification_type: sqs
compression: none
codec:
newline:
s3_select:
expression: "select * from s3object s LIMIT 10000"
expression_type: SQL
Expand All @@ -71,7 +73,6 @@ source-pipeline:
sts_role_arn: "arn:aws:iam::123456789012:role/Data-Prepper"
scan:
start_time: 2023-01-21T18:00:00
range: P90DT3H4M
end_time: 2023-04-21T18:00:00
buckets:
- bucket:
Expand All @@ -88,41 +89,7 @@ source-pipeline:

All Duration values are a string that represents a duration. They support ISO_8601 notation string ("PT20.345S", "PT15M", etc.) as well as simple notation Strings for seconds ("60s") and milliseconds ("1500ms").

* `s3_select` : S3 Select Configuration.

* `expression` (Required if s3_select enabled) : Provide s3 select query to process the data using S3 select for the particular bucket.

* `expression_type` (Optional if s3_select enabled) : Provide s3 select query type to process the data using S3 select for the particular bucket.

* `compression_type` (Optional if s3_select enabled) : The compression algorithm to apply. May be one of: `none`, `gzip`. Defaults to `none`.

* `input_serialization` (Required if s3_select enabled) : Provide the s3 select file format (csv/json/Apache Parquet) Amazon S3 uses this format to parse object data into records and returns only records that match the specified SQL expression. You must also specify the data serialization format for the response.

* `csv` (Optional) : Provide the csv configuration to process the csv data.

* `file_header_info` (Required if csv block is enabled) : Provide CSV Header example : `use` , `none` , `ignore`. Default is `use`.

* `quote_escape` (Optional) : Provide quote_escape attribute example : `,` , `.`.

* `comments` (Optional) : Provide comments attribute example : `#`. Default is `#`.

* `json` (Optional) : Provide the json configuration to process the json data.

* `type` (Optional) : Provide the type attribute to process the json type data example: `Lines` , `Document` Default is `Document`.

* `bucket_name` : Provide S3 bucket name.

* `key_prefix` (Optional) : Provide include and exclude the list items.

* `include` (Optional) : Provide the list of include key path prefix.

* `exclude_suffix` (Optional) : Provide the list of suffix to exclude items.

* `start_time` (Optional) : Provide the start time to scan the data. for example the files updated between start_time and end_time will be scanned. example : `2023-01-23T10:00:00`.

* `end_time` (Optional) : Provide the end time to scan the data. for example the files updated between start_time and end_time will be scanned. example : `2023-01-23T10:00:00`.

* `range` (Optional) : Provide the duration to scan the data example : `day` , `week` , `month` , `year`.
* `s3_select` : S3 Select Configuration. See [S3 Select Configuration](#s3_select_configuration) for details

* `notification_type` (Optional) : Must be `sqs`.

Expand All @@ -134,6 +101,8 @@ All Duration values are a string that represents a duration. They support ISO_86

* `sqs` (Optional) : The SQS configuration. See [SQS Configuration](#sqs_configuration) for details.

* `scan` (Optional): S3 Scan Configuration. See [S3 Scan Configuration](#s3_scan_configuration) for details

* `aws` (Optional) : AWS configurations. See [AWS Configuration](#aws_configuration) for details.

* `acknowledgments` (Optional) : Enables End-to-end acknowledgments. If set to `true`, sqs message is deleted only after all events from the sqs message are successfully acknowledged by all sinks. Default value `false`.
Expand All @@ -148,6 +117,28 @@ All Duration values are a string that represents a duration. They support ISO_86

* `disable_bucket_ownership_validation` (Optional) : Boolean - If set to true, then the S3 Source will not attempt to validate that the bucket is owned by the expected account. The only expected account is the same account which owns the SQS queue. Defaults to `false`.

### <a name="s3_select_configuration">S3 Select Configuration</a>

* `expression` (Required if s3_select enabled) : Provide s3 select query to process the data using S3 select for the particular bucket.

* `expression_type` (Optional if s3_select enabled) : Provide s3 select query type to process the data using S3 select for the particular bucket.

* `compression_type` (Optional if s3_select enabled) : The compression algorithm to apply. May be one of: `none`, `gzip`. Defaults to `none`.

* `input_serialization` (Required if s3_select enabled) : Provide the s3 select file format (csv/json/Apache Parquet) Amazon S3 uses this format to parse object data into records and returns only records that match the specified SQL expression. You must also specify the data serialization format for the response.

* `csv` (Optional) : Provide the csv configuration to process the csv data.

* `file_header_info` (Required if csv block is enabled) : Provide CSV Header example : `use` , `none` , `ignore`. Default is `use`.

* `quote_escape` (Optional) : Provide quote_escape attribute example : `,` , `.`.

* `comments` (Optional) : Provide comments attribute example : `#`. Default is `#`.

* `json` (Optional) : Provide the json configuration to process the json data.

* `type` (Optional) : Provide the type attribute to process the json type data example: `Lines` , `Document` Default is `Document`.

### <a name="sqs_configuration">SQS Configuration</a>

* `queue_url` (Required) : The SQS queue URL of the queue to read from.
Expand All @@ -156,6 +147,21 @@ All Duration values are a string that represents a duration. They support ISO_86
* `wait_time` (Optional) : Duration - The time to wait for long-polling on the SQS API. Defaults to 20 seconds.
* `poll_delay` (Optional) : Duration - A delay to place between reading and processing a batch of SQS messages and making a subsequent request. Defaults to 0 seconds.

### <a name="s3_scan_configuration">S3 Scan Configuration</a>
* `start_time` (Optional) : Provide the start time to scan objects from all the buckets. This parameter defines a time range together with either end_time or range. Example: `2023-01-23T10:00:00`.
* `end_time` (Optional) : Provide the end time to scan objects from all the buckets. This parameter defines a time range together with either start_time or range. Example: `2023-01-23T10:00:00`.
* `range` (Optional) : Provide the duration to scan objects from all the buckets. This parameter defines a time range together with either start_time or end_time.
* `bucket`: Provide S3 bucket information
* `name` (Required if bucket block is used): Provide S3 bucket name.
* `key_prefix` (Optional) : Provide include and exclude the list items.
* `include` (Optional) : Provide the list of include key path prefix.
* `exclude_suffix` (Optional) : Provide the list of suffix to exclude items.
* `start_time` (Optional) : Provide the start time to scan objects from the current bucket. This parameter defines a time range together with either end_time or range. Example: `2023-01-23T10:00:00`.
* `end_time` (Optional) : Provide the end time to scan objects from the current bucket. This parameter defines a time range together with either start_time or range. Example: `2023-01-23T10:00:00`.
* `range` (Optional) : Provide the duration to scan objects from the current bucket. This parameter defines a time range together with either start_time or end_time.

> Note: If a time range is not specified, all objects will be included by default. To set a time range, specify any two and only two configurations from start_time, end_time and range. The time range configured on a specific bucket will override the time range specified on the top level
### <a name="aws_configuration">AWS Configuration</a>

The AWS configuration is the same for both SQS and S3.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
import org.opensearch.dataprepper.model.source.coordinator.SourceCoordinator;
import org.opensearch.dataprepper.parser.model.SourceCoordinationConfig;
import org.opensearch.dataprepper.plugins.codec.CompressionOption;
import org.opensearch.dataprepper.plugins.source.configuration.S3ScanKeyPathOption;
import org.opensearch.dataprepper.plugins.source.configuration.S3ScanBucketOption;
import org.opensearch.dataprepper.plugins.source.configuration.S3SelectCSVOption;
import org.opensearch.dataprepper.plugins.source.configuration.S3SelectJsonOption;
import org.opensearch.dataprepper.plugins.source.configuration.S3SelectSerializationFormatOption;
Expand Down Expand Up @@ -187,24 +187,25 @@ void parseS3Object_parquet_correctly_with_bucket_scan_and_loads_data_into_Buffer
final RecordsGenerator recordsGenerator = new ParquetRecordsGenerator();
final String keyPrefix = "s3source/s3-scan/" + recordsGenerator.getFileExtension() + "/" + Instant.now().toEpochMilli();

final String includeOptionsYaml = " include:\n" +
" - "+keyPrefix+"\n" +
" exclude_suffix:\n" +
" - .csv\n" +
" - .json\n" +
" - .txt\n" +
" - .gz";

final String buketOptionYaml = "name: " + bucket + "\n" +
"key_prefix:\n" +
" include:\n" +
" - " + keyPrefix + "\n" +
" exclude_suffix:\n" +
" - .csv\n" +
" - .json\n" +
" - .txt\n" +
" - .gz";

final String key = getKeyString(keyPrefix, recordsGenerator, Boolean.FALSE);

s3ObjectGenerator.write(numberOfRecords, key, recordsGenerator, Boolean.FALSE);
stubBufferWriter(recordsGenerator::assertEventIsCorrect, key);
final ScanOptions startTimeAndRangeScanOptions = new ScanOptions.Builder()
.setBucket(bucket)
.setBucketOption(objectMapper.readValue(buketOptionYaml, S3ScanBucketOption.class))
.setStartDateTime(LocalDateTime.now().minusDays(1))
.setRange(Duration.parse("P2DT10M"))
.setS3ScanKeyPathOption(objectMapper.readValue(includeOptionsYaml, S3ScanKeyPathOption.class)).build();
.build();

final ScanObjectWorker objectUnderTest = createObjectUnderTest(recordsGenerator,
numberOfRecordsToAccumulate,
Expand Down Expand Up @@ -234,9 +235,11 @@ void parseS3Object_correctly_with_bucket_scan_and_loads_data_into_Buffer(
final ScanOptions.Builder scanOptions) throws Exception {
String keyPrefix = "s3source/s3-scan/" + recordsGenerator.getFileExtension() + "/" + Instant.now().toEpochMilli();
final String key = getKeyString(keyPrefix,recordsGenerator, shouldCompress);
final String includeOptionsYaml = " include:\n" +
" - "+keyPrefix;
scanOptions.setS3ScanKeyPathOption(objectMapper.readValue(includeOptionsYaml, S3ScanKeyPathOption.class));
final String buketOptionYaml = "name: " + bucket + "\n" +
"key_prefix:\n" +
" include:\n" +
" - " + keyPrefix;
scanOptions.setBucketOption(objectMapper.readValue(buketOptionYaml, S3ScanBucketOption.class));

s3ObjectGenerator.write(numberOfRecords, key, recordsGenerator, shouldCompress);
stubBufferWriter(recordsGenerator::assertEventIsCorrect, key);
Expand Down Expand Up @@ -284,19 +287,14 @@ public Stream<? extends Arguments> provideArguments(final ExtensionContext conte
final List<Integer> recordsToAccumulateList = List.of( 100);
final List<Boolean> booleanList = List.of(Boolean.TRUE);

final String bucket = System.getProperty("tests.s3source.bucket");
final ScanOptions.Builder startTimeAndRangeScanOptions = new ScanOptions.Builder()
final ScanOptions.Builder startTimeAndRangeScanOptions = ScanOptions.builder()
.setStartDateTime(LocalDateTime.now())
.setBucket(bucket)
.setRange(Duration.parse("P2DT10H"));
final ScanOptions.Builder endTimeAndRangeScanOptions = new ScanOptions.Builder()
final ScanOptions.Builder endTimeAndRangeScanOptions = ScanOptions.builder()
.setEndDateTime(LocalDateTime.now().plus(Duration.ofHours(1)))
.setBucket(bucket)
.setRange(Duration.parse("P7DT10H"));

final ScanOptions.Builder startTimeAndEndTimeScanOptions = new ScanOptions.Builder()
final ScanOptions.Builder startTimeAndEndTimeScanOptions = ScanOptions.builder()
.setStartDateTime(LocalDateTime.now().minus(Duration.ofMinutes(10)))
.setBucket(bucket)
.setEndDateTime(LocalDateTime.now().plus(Duration.ofHours(1)));

List<ScanOptions.Builder> scanOptions = List.of(startTimeAndRangeScanOptions,endTimeAndRangeScanOptions,startTimeAndEndTimeScanOptions);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,10 +47,10 @@ public List<PartitionIdentifier> apply(final Map<String, Object> globalStateMap)

for (final ScanOptions scanOptions : scanOptionsList) {
final List<String> excludeItems = new ArrayList<>();
final S3ScanKeyPathOption s3ScanKeyPathOption = scanOptions.getS3ScanKeyPathOption();
final S3ScanKeyPathOption s3ScanKeyPathOption = scanOptions.getBucketOption().getkeyPrefix();
final ListObjectsV2Request.Builder listObjectsV2Request = ListObjectsV2Request.builder()
.bucket(scanOptions.getBucket());
bucketOwnerProvider.getBucketOwner(scanOptions.getBucket())
.bucket(scanOptions.getBucketOption().getName());
bucketOwnerProvider.getBucketOwner(scanOptions.getBucketOption().getName())
.ifPresent(listObjectsV2Request::expectedBucketOwner);

if (Objects.nonNull(s3ScanKeyPathOption) && Objects.nonNull(s3ScanKeyPathOption.getS3ScanExcludeSuffixOptions()))
Expand All @@ -59,12 +59,12 @@ public List<PartitionIdentifier> apply(final Map<String, Object> globalStateMap)
if (Objects.nonNull(s3ScanKeyPathOption) && Objects.nonNull(s3ScanKeyPathOption.getS3scanIncludeOptions()))
s3ScanKeyPathOption.getS3scanIncludeOptions().forEach(includePath -> {
listObjectsV2Request.prefix(includePath);
objectsToProcess.addAll(listFilteredS3ObjectsForBucket(excludeItems, listObjectsV2Request, scanOptions.getBucket(),
scanOptions.getUseStartDateTime(), scanOptions.getUseEndDateTime()));
objectsToProcess.addAll(listFilteredS3ObjectsForBucket(excludeItems, listObjectsV2Request,
scanOptions.getBucketOption().getName(), scanOptions.getUseStartDateTime(), scanOptions.getUseEndDateTime()));
});
else
objectsToProcess.addAll(listFilteredS3ObjectsForBucket(excludeItems, listObjectsV2Request, scanOptions.getBucket(),
scanOptions.getUseStartDateTime(), scanOptions.getUseEndDateTime()));
objectsToProcess.addAll(listFilteredS3ObjectsForBucket(excludeItems, listObjectsV2Request,
scanOptions.getBucketOption().getName(), scanOptions.getUseStartDateTime(), scanOptions.getUseEndDateTime()));
}

return objectsToProcess;
Expand Down Expand Up @@ -106,6 +106,9 @@ private LocalDateTime instantToLocalDateTime(final Instant instant) {
private boolean isKeyMatchedBetweenTimeRange(final LocalDateTime lastModifiedTime,
final LocalDateTime startDateTime,
final LocalDateTime endDateTime){
if (Objects.isNull(startDateTime) || Objects.isNull(endDateTime)) {
return true;
}
return lastModifiedTime.isAfter(startDateTime) && lastModifiedTime.isBefore(endDateTime);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
package org.opensearch.dataprepper.plugins.source;

import org.opensearch.dataprepper.model.source.coordinator.SourceCoordinator;
import org.opensearch.dataprepper.plugins.source.configuration.S3ScanBucketOption;
import org.opensearch.dataprepper.plugins.source.configuration.S3ScanBucketOptions;
import org.opensearch.dataprepper.plugins.source.ownership.BucketOwnerProvider;

Expand Down Expand Up @@ -67,12 +66,11 @@ List<ScanOptions> getScanOptions() {
}

private void buildScanOptions(final List<ScanOptions> scanOptionsList, final S3ScanBucketOptions scanBucketOptions) {
final S3ScanBucketOption s3ScanBucketOption = scanBucketOptions.getS3ScanBucketOption();
scanOptionsList.add(new ScanOptions.Builder()
scanOptionsList.add(ScanOptions.builder()
.setStartDateTime(startDateTime)
.setEndDateTime(endDateTime)
.setRange(range)
.setBucket(s3ScanBucketOption.getName())
.setS3ScanKeyPathOption(s3ScanBucketOption.getkeyPrefix()).build());
.setBucketOption(scanBucketOptions.getS3ScanBucketOption())
.build());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ public class S3SourceConfig {
private PluginModel codec;

@JsonProperty("sqs")
@Valid
private SqsOptions sqsOptions;

@JsonProperty("aws")
Expand All @@ -64,9 +65,11 @@ public class S3SourceConfig {
@JsonProperty("metadata_root_key")
private String metadataRootKey = DEFAULT_METADATA_ROOT_KEY;
@JsonProperty("s3_select")
@Valid
private S3SelectOptions s3SelectOptions;

@JsonProperty("scan")
@Valid
private S3ScanScanOptions s3ScanScanOptions;

@AssertTrue(message = "A codec is required for reading objects.")
Expand Down
Loading

0 comments on commit b180634

Please sign in to comment.