Skip to content

Commit

Permalink
Address review comments
Browse files Browse the repository at this point in the history
Signed-off-by: Hai Yan <[email protected]>
  • Loading branch information
oeyh committed Sep 17, 2024
1 parent 94a3432 commit 7320d03
Show file tree
Hide file tree
Showing 5 changed files with 25 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -403,28 +403,15 @@ private boolean isJsonPath(String parameter) {
}

/**
* Calculate s3 folder scan depth for DocDB source pipeline
* Calculate s3 folder scan depth for DocDB and RDS source pipeline
* @param s3Prefix: s3 prefix defined in the source configuration
* @return s3 folder scan depth
*/
public String calculateDepth(String s3Prefix) {
return Integer.toString(getDepth(s3Prefix, 4));
}

/**
* Calculate s3 folder scan depth for RDS source pipeline
* @param s3Prefix: s3 prefix defined in the source configuration
* @return s3 folder scan depth
*/
public String calculateDepthForRdsSource(String s3Prefix) {
return Integer.toString(getDepth(s3Prefix, 3));
}

private int getDepth(String s3Prefix, int baseDepth) {
if(s3Prefix == null){
return baseDepth;
if (s3Prefix == null) {
return Integer.toString(4);
}
return s3Prefix.split("/").length + baseDepth;
return Integer.toString(s3Prefix.split("/").length + 4);
}

public String getSourceCoordinationIdentifierEnvVariable(String s3Prefix){
Expand All @@ -443,7 +430,7 @@ public String getSourceCoordinationIdentifierEnvVariable(String s3Prefix){
public String getIncludePrefixForRdsSource(String s3Prefix) {
String envSourceCoordinationIdentifier = System.getenv(SOURCE_COORDINATION_IDENTIFIER_ENVIRONMENT_VARIABLE);
if (s3Prefix == null && envSourceCoordinationIdentifier == null) {
return "";
return S3_BUFFER_PREFIX;
} else if (s3Prefix == null) {
return envSourceCoordinationIdentifier + S3_BUFFER_PREFIX;
} else if (envSourceCoordinationIdentifier == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import software.amazon.awssdk.services.rds.RdsClient;
import software.amazon.awssdk.services.s3.S3Client;

import java.time.Instant;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutorService;
Expand Down Expand Up @@ -162,7 +163,7 @@ private String getS3PathPrefix() {

final String s3PathPrefix;
if (sourceCoordinator.getPartitionPrefix() != null ) {
s3PathPrefix = s3UserPathPrefix + S3_PATH_DELIMITER + sourceCoordinator.getPartitionPrefix();
s3PathPrefix = s3UserPathPrefix + S3_PATH_DELIMITER + sourceCoordinator.getPartitionPrefix() + S3_PATH_DELIMITER + Instant.now().toEpochMilli();
} else {
s3PathPrefix = s3UserPathPrefix;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ public class BinlogEventListener implements BinaryLogClient.EventListener {

static final Duration BUFFER_TIMEOUT = Duration.ofSeconds(60);
static final int DEFAULT_BUFFER_BATCH_SIZE = 1_000;
static final String DATA_PREPPER_EVENT_TYPE = "event";
static final String CHANGE_EVENTS_PROCESSED_COUNT = "changeEventsProcessed";
static final String CHANGE_EVENTS_PROCESSING_ERROR_COUNT = "changeEventsProcessingErrors";
static final String BYTES_RECEIVED = "bytesReceived";
Expand Down Expand Up @@ -247,7 +248,7 @@ private void handleRowChangeEvent(com.github.shyiko.mysql.binlog.event.Event eve
}

final Event dataPrepperEvent = JacksonEvent.builder()
.withEventType("event")
.withEventType(DATA_PREPPER_EVENT_TYPE)
.withData(rowDataMap)
.build();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@
disable_s3_metadata_in_event: true
scan:
folder_partitions:
depth: "<<FUNCTION_NAME:calculateDepthForRdsSource,PARAMETER:$.<<pipeline-name>>.source.rds.s3_prefix>>"
depth: "<<FUNCTION_NAME:calculateDepth,PARAMETER:$.<<pipeline-name>>.source.rds.s3_prefix>>"
max_objects_per_ownership: 50
buckets:
- bucket:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.Answers;
import org.mockito.Mock;
import org.mockito.MockedConstruction;
import org.mockito.MockedStatic;
import org.mockito.junit.jupiter.MockitoExtension;
import org.opensearch.dataprepper.metrics.PluginMetrics;
Expand All @@ -34,14 +35,18 @@
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.startsWith;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.Mockito.RETURNS_DEEP_STUBS;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.mockConstruction;
import static org.mockito.Mockito.mockStatic;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import static org.opensearch.dataprepper.plugins.source.rds.RdsService.S3_PATH_DELIMITER;

@ExtendWith(MockitoExtension.class)
class RdsServiceTest {
Expand Down Expand Up @@ -131,12 +136,21 @@ void test_normal_service_start_when_stream_is_enabled() {
when(sourceConfig.getAuthenticationConfig()).thenReturn(authConfig);
when(sourceConfig.getTlsConfig()).thenReturn(mock(TlsConfig.class));

final String s3Prefix = UUID.randomUUID().toString();
final String partitionPrefix = UUID.randomUUID().toString();
when(sourceConfig.getS3Prefix()).thenReturn(s3Prefix);
when(sourceCoordinator.getPartitionPrefix()).thenReturn(partitionPrefix);

final RdsService rdsService = createObjectUnderTest();
try (final MockedStatic<Executors> executorsMockedStatic = mockStatic(Executors.class)) {
final String[] s3PrefixArray = new String[1];
try (final MockedStatic<Executors> executorsMockedStatic = mockStatic(Executors.class);
final MockedConstruction<LeaderScheduler> leaderSchedulerMockedConstruction = mockConstruction(LeaderScheduler.class,
(mock, context) -> s3PrefixArray[0] = (String) context.arguments().get(2))) {
executorsMockedStatic.when(() -> Executors.newFixedThreadPool(anyInt())).thenReturn(executor);
rdsService.start(buffer);
}

assertThat(s3PrefixArray[0], startsWith(s3Prefix + S3_PATH_DELIMITER + partitionPrefix + S3_PATH_DELIMITER));
verify(executor).submit(any(LeaderScheduler.class));
verify(executor).submit(any(StreamScheduler.class));
verify(executor, never()).submit(any(ExportScheduler.class));
Expand Down

0 comments on commit 7320d03

Please sign in to comment.