Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support start_time or range options for the first scan of scheduled s3 scan #4929

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,7 @@ private List<PartitionIdentifier> listFilteredS3ObjectsForBucket(final List<Stri
final LocalDateTime endDateTime,
final Map<String, Object> globalStateMap) {
final Instant previousScanTime = globalStateMap.get(bucket) != null ? Instant.parse((String) globalStateMap.get(bucket)) : null;
final boolean isFirstScan = previousScanTime == null;
final List<PartitionIdentifier> allPartitionIdentifiers = new ArrayList<>();
ListObjectsV2Response listObjectsV2Response = null;
do {
Expand All @@ -124,7 +125,7 @@ private List<PartitionIdentifier> listFilteredS3ObjectsForBucket(final List<Stri
.filter(keyTimestampPair -> !keyTimestampPair.left().endsWith("/"))
.filter(keyTimestampPair -> excludeKeyPaths.stream()
.noneMatch(excludeItem -> keyTimestampPair.left().endsWith(excludeItem)))
.filter(keyTimestampPair -> isKeyMatchedBetweenTimeRange(keyTimestampPair.right(), startDateTime, endDateTime))
.filter(keyTimestampPair -> isKeyMatchedBetweenTimeRange(keyTimestampPair.right(), startDateTime, endDateTime, isFirstScan))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure if S3 scan api itself can take this filter? If yes, that should help filter out these records at the S3 end itself.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah would be nice to do this server side. ListObjectsV2 does not support it though (https://docs.aws.amazon.com/AmazonS3/latest/API/API_ListObjectsV2.html)

.map(Pair::left)
.map(objectKey -> PartitionIdentifier.builder().withPartitionKey(String.format(BUCKET_OBJECT_PARTITION_KEY_FORMAT, bucket, objectKey)).build())
.collect(Collectors.toList()));
Expand Down Expand Up @@ -166,8 +167,9 @@ private LocalDateTime instantToLocalDateTime(final Instant instant) {
*/
private boolean isKeyMatchedBetweenTimeRange(final LocalDateTime lastModifiedTime,
final LocalDateTime startDateTime,
final LocalDateTime endDateTime) {
if (Objects.nonNull(schedulingOptions)) {
final LocalDateTime endDateTime,
final boolean isFirstScan) {
if (!isFirstScan && schedulingOptions != null) {
return true;
} else if (Objects.isNull(startDateTime) && Objects.isNull(endDateTime)) {
return true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,9 +55,14 @@ public boolean hasValidTimeAndRangeOptions() {
return (startTime != null || endTime != null) && range != null;
}

@AssertTrue(message = "start_time, end_time, and range are not valid options when using scheduling with s3 scan")
@AssertTrue(message = "end_time is not a valid option when using scheduling with s3 scan. One of start_time or range must be used for scheduled scan.")
public boolean hasValidTimeOptionsWithScheduling() {
return !Objects.nonNull(schedulingOptions) || Stream.of(startTime, endTime, range).noneMatch(Objects::nonNull);

if (schedulingOptions != null && ((startTime != null && range != null) || endTime != null)) {
return false;
}

return true;
}

public Duration getRange() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import java.util.stream.Collectors;

Expand All @@ -44,6 +45,7 @@
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.hamcrest.Matchers.lessThanOrEqualTo;
import static org.hamcrest.Matchers.notNullValue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.BDDMockito.given;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
Expand Down Expand Up @@ -289,6 +291,67 @@ void getNextPartition_supplier_with_scheduling_options_returns_expected_Partitio
verify(listObjectsResponse, times(4)).contents();
}


@Test
void scheduled_scan_filters_on_start_time_and_end_time_for_the_first_scan_and_does_not_filter_on_subsequent_scans() {
schedulingOptions = mock(S3ScanSchedulingOptions.class);
given(schedulingOptions.getCount()).willReturn(2);

final String firstScanBucket = "bucket-one";
final String notFirstScanBucket = "bucket-two";

final Map<String, Object> globalStateMap = new HashMap<>();
globalStateMap.put(firstScanBucket, null);
globalStateMap.put(notFirstScanBucket, "2024-09-07T20:43:34.384822Z");
globalStateMap.put(SCAN_COUNT, 0);

final LocalDateTime startTime = LocalDateTime.ofInstant(Instant.ofEpochMilli(1725907846000L), ZoneId.systemDefault());
final LocalDateTime endTime = LocalDateTime.ofInstant(Instant.ofEpochMilli(1725907849100L), ZoneId.systemDefault());

final ScanOptions firstBucketScanOptions = mock(ScanOptions.class);
final S3ScanBucketOption firstBucketScanBucketOption = mock(S3ScanBucketOption.class);
given(firstBucketScanOptions.getBucketOption()).willReturn(firstBucketScanBucketOption);
given(firstBucketScanBucketOption.getName()).willReturn(firstScanBucket);
given(firstBucketScanOptions.getUseStartDateTime()).willReturn(startTime);
given(firstBucketScanOptions.getUseEndDateTime()).willReturn(endTime);
scanOptionsList.add(firstBucketScanOptions);

final ScanOptions notFirstScanOptions = mock(ScanOptions.class);
final S3ScanBucketOption notFirstScanBucketOption = mock(S3ScanBucketOption.class);
given(notFirstScanOptions.getBucketOption()).willReturn(notFirstScanBucketOption);
given(notFirstScanBucketOption.getName()).willReturn(notFirstScanBucket);
given(notFirstScanOptions.getUseStartDateTime()).willReturn(startTime);
given(notFirstScanOptions.getUseEndDateTime()).willReturn(endTime);
scanOptionsList.add(notFirstScanOptions);

final ListObjectsV2Response listObjectsResponse = mock(ListObjectsV2Response.class);
final List<S3Object> s3ObjectsList = new ArrayList<>();

final Instant objectNotBetweenStartAndEndTime = Instant.ofEpochMilli(1725907846000L).minus(500L, TimeUnit.SECONDS.toChronoUnit());
final S3Object validObject = mock(S3Object.class);
given(validObject.key()).willReturn("valid");
given(validObject.lastModified()).willReturn(objectNotBetweenStartAndEndTime);
s3ObjectsList.add(validObject);

final List<PartitionIdentifier> expectedPartitionIdentifiers = new ArrayList<>();
expectedPartitionIdentifiers.add(PartitionIdentifier.builder().withPartitionKey(notFirstScanBucket + "|" + validObject.key()).build());

given(listObjectsResponse.contents())
.willReturn(s3ObjectsList);

given(s3Client.listObjectsV2(any(ListObjectsV2Request.class))).willReturn(listObjectsResponse);

final Function<Map<String, Object>, List<PartitionIdentifier>> partitionCreationSupplier = createObjectUnderTest();

final List<PartitionIdentifier> firstScanPartitions = partitionCreationSupplier.apply(globalStateMap);
assertThat(firstScanPartitions.size(), equalTo(expectedPartitionIdentifiers.size()));
assertThat(firstScanPartitions.stream().map(PartitionIdentifier::getPartitionKey).collect(Collectors.toList()),
containsInAnyOrder(expectedPartitionIdentifiers.stream().map(PartitionIdentifier::getPartitionKey).map(Matchers::equalTo).collect(Collectors.toList())));

final List<PartitionIdentifier> secondScanPartitions = partitionCreationSupplier.apply(globalStateMap);
assertThat(secondScanPartitions.isEmpty(), equalTo(true));
}

@Test
void getNextPartition_with_folder_partitioning_enabled_returns_the_expected_partition_identifiers() {
folderPartitioningOptions = mock(FolderPartitioningOptions.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,4 +45,60 @@ public void s3scan_options_test_with_scan_yaml_configuration_test() throws JsonP
assertThat(s3ScanScanOptions.getBuckets().get(0).getS3ScanBucketOption().getS3ScanFilter().getS3ScanExcludeSuffixOptions().get(0),
equalTo(".jpeg"));
}

@Test
public void s3scan_options_with_scheduled_scan_does_not_allow_end_time() throws JsonProcessingException {
final String scanYaml = " start_time: 2023-01-21T18:00:00\n" +
" end_time: 2023-04-21T18:00:00\n" +
" scheduling: \n" +
" count: 1\n" +
" buckets:\n" +
" - bucket:\n" +
" name: test-s3-source-test-output\n" +
" filter:\n" +
" include_prefix:\n" +
" - bucket2\n" +
" exclude_suffix:\n" +
" - .jpeg";
final S3ScanScanOptions s3ScanScanOptions = objectMapper.readValue(scanYaml, S3ScanScanOptions.class);
assertThat(s3ScanScanOptions.getStartTime(),equalTo(LocalDateTime.parse("2023-01-21T18:00:00")));
assertThat(s3ScanScanOptions.getEndTime(),equalTo(LocalDateTime.parse("2023-04-21T18:00:00")));
assertThat(s3ScanScanOptions.getBuckets(),instanceOf(List.class));
assertThat(s3ScanScanOptions.getBuckets().get(0).getS3ScanBucketOption().getName(),equalTo("test-s3-source-test-output"));
assertThat(s3ScanScanOptions.getBuckets().get(0).getS3ScanBucketOption().getS3ScanFilter().getS3ScanExcludeSuffixOptions(),instanceOf(List.class));
assertThat(s3ScanScanOptions.getBuckets().get(0).getS3ScanBucketOption().getS3ScanFilter().getS3scanIncludePrefixOptions(),instanceOf(List.class));
assertThat(s3ScanScanOptions.getBuckets().get(0).getS3ScanBucketOption().getS3ScanFilter().getS3scanIncludePrefixOptions().get(0),
equalTo("bucket2"));
assertThat(s3ScanScanOptions.getBuckets().get(0).getS3ScanBucketOption().getS3ScanFilter().getS3ScanExcludeSuffixOptions().get(0),
equalTo(".jpeg"));

assertThat(s3ScanScanOptions.hasValidTimeOptionsWithScheduling(), equalTo(false));
}

@Test
public void s3scan_options_with_scheduled_scan_allows_start_time() throws JsonProcessingException {
final String scanYaml = " start_time: 2023-01-21T18:00:00\n" +
" scheduling: \n" +
" count: 1\n" +
" buckets:\n" +
" - bucket:\n" +
" name: test-s3-source-test-output\n" +
" filter:\n" +
" include_prefix:\n" +
" - bucket2\n" +
" exclude_suffix:\n" +
" - .jpeg";
final S3ScanScanOptions s3ScanScanOptions = objectMapper.readValue(scanYaml, S3ScanScanOptions.class);
assertThat(s3ScanScanOptions.getStartTime(),equalTo(LocalDateTime.parse("2023-01-21T18:00:00")));
assertThat(s3ScanScanOptions.getBuckets(),instanceOf(List.class));
assertThat(s3ScanScanOptions.getBuckets().get(0).getS3ScanBucketOption().getName(),equalTo("test-s3-source-test-output"));
assertThat(s3ScanScanOptions.getBuckets().get(0).getS3ScanBucketOption().getS3ScanFilter().getS3ScanExcludeSuffixOptions(),instanceOf(List.class));
assertThat(s3ScanScanOptions.getBuckets().get(0).getS3ScanBucketOption().getS3ScanFilter().getS3scanIncludePrefixOptions(),instanceOf(List.class));
assertThat(s3ScanScanOptions.getBuckets().get(0).getS3ScanBucketOption().getS3ScanFilter().getS3scanIncludePrefixOptions().get(0),
equalTo("bucket2"));
assertThat(s3ScanScanOptions.getBuckets().get(0).getS3ScanBucketOption().getS3ScanFilter().getS3ScanExcludeSuffixOptions().get(0),
equalTo(".jpeg"));

assertThat(s3ScanScanOptions.hasValidTimeOptionsWithScheduling(), equalTo(true));
}
}
Loading