Skip to content

Commit

Permalink
Revert changes on prefix
Browse files Browse the repository at this point in the history
Signed-off-by: Hai Yan <[email protected]>
  • Loading branch information
oeyh committed Sep 17, 2024
1 parent 7320d03 commit 554ebc9
Show file tree
Hide file tree
Showing 4 changed files with 21 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -403,15 +403,28 @@ private boolean isJsonPath(String parameter) {
}

/**
* Calculate s3 folder scan depth for DocDB and RDS source pipeline
* Calculate s3 folder scan depth for DocDB source pipeline
* @param s3Prefix: s3 prefix defined in the source configuration
* @return s3 folder scan depth
*/
public String calculateDepth(String s3Prefix) {
if (s3Prefix == null) {
return Integer.toString(4);
return Integer.toString(getDepth(s3Prefix, 4));
}

/**
* Calculate s3 folder scan depth for RDS source pipeline
* @param s3Prefix: s3 prefix defined in the source configuration
* @return s3 folder scan depth
*/
public String calculateDepthForRdsSource(String s3Prefix) {
return Integer.toString(getDepth(s3Prefix, 3));
}

private int getDepth(String s3Prefix, int baseDepth) {
if(s3Prefix == null){
return baseDepth;
}
return Integer.toString(s3Prefix.split("/").length + 4);
return s3Prefix.split("/").length + baseDepth;
}

public String getSourceCoordinationIdentifierEnvVariable(String s3Prefix){
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@
import software.amazon.awssdk.services.rds.RdsClient;
import software.amazon.awssdk.services.s3.S3Client;

import java.time.Instant;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutorService;
Expand Down Expand Up @@ -163,7 +162,7 @@ private String getS3PathPrefix() {

final String s3PathPrefix;
if (sourceCoordinator.getPartitionPrefix() != null ) {
s3PathPrefix = s3UserPathPrefix + S3_PATH_DELIMITER + sourceCoordinator.getPartitionPrefix() + S3_PATH_DELIMITER + Instant.now().toEpochMilli();
s3PathPrefix = s3UserPathPrefix + S3_PATH_DELIMITER + sourceCoordinator.getPartitionPrefix();
} else {
s3PathPrefix = s3UserPathPrefix;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@
disable_s3_metadata_in_event: true
scan:
folder_partitions:
depth: "<<FUNCTION_NAME:calculateDepth,PARAMETER:$.<<pipeline-name>>.source.rds.s3_prefix>>"
depth: "<<FUNCTION_NAME:calculateDepthForRdsSource,PARAMETER:$.<<pipeline-name>>.source.rds.s3_prefix>>"
max_objects_per_ownership: 50
buckets:
- bucket:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@
import java.util.concurrent.Executors;

import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.startsWith;
import static org.hamcrest.Matchers.equalTo;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.Mockito.RETURNS_DEEP_STUBS;
Expand Down Expand Up @@ -150,7 +150,7 @@ void test_normal_service_start_when_stream_is_enabled() {
rdsService.start(buffer);
}

assertThat(s3PrefixArray[0], startsWith(s3Prefix + S3_PATH_DELIMITER + partitionPrefix + S3_PATH_DELIMITER));
assertThat(s3PrefixArray[0], equalTo(s3Prefix + S3_PATH_DELIMITER + partitionPrefix));
verify(executor).submit(any(LeaderScheduler.class));
verify(executor).submit(any(StreamScheduler.class));
verify(executor, never()).submit(any(ExportScheduler.class));
Expand Down

0 comments on commit 554ebc9

Please sign in to comment.