Skip to content

Commit

Permalink
Merge branch 'main' into opensearch-api-async
Browse files Browse the repository at this point in the history
Signed-off-by: Souvik Bose <[email protected]>
  • Loading branch information
sb2k16 committed Aug 14, 2024
2 parents 77930fc + 1bfed0d commit 918dfd9
Show file tree
Hide file tree
Showing 169 changed files with 4,400 additions and 475 deletions.
2 changes: 1 addition & 1 deletion .github/CODEOWNERS
Validating CODEOWNERS rules …
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
# This should match the owning team set up in https://github.com/orgs/opensearch-project/teams
* @chenqi0805 @engechas @graytaylor0 @dinujoh @kkondaka @asifsmohammed @KarstenSchnitter @dlvenable @oeyh
* @chenqi0805 @engechas @graytaylor0 @dinujoh @kkondaka @KarstenSchnitter @dlvenable @oeyh
2 changes: 1 addition & 1 deletion TRIAGING.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ However, should we run out of time before your issue is discussed, you are alway
Meetings are hosted regularly Tuesdays at 2:30 PM US Central Time (12:30 PM Pacific Time) and can be joined via the links posted on the [OpenSearch Meetup Group](https://www.meetup.com/opensearch/events/) list of events.
The event will be titled `Data Prepper Triage Meeting`.

After joining the Zoom meeting, you can enable your video / voice to join the discussion.
After joining the video meeting, you can enable your video / voice to join the discussion.
If you do not have a webcam or microphone available, you can still join in via the text chat.

If you have an issue you'd like to bring forth please consider getting a link to the issue so it can be presented to everyone in the meeting.
Expand Down
5 changes: 3 additions & 2 deletions build-resources.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -14,5 +14,6 @@ ext.coreProjects = [
project(':data-prepper-plugins'),
project(':data-prepper-test-common'),
project(':data-prepper-test-event'),
project(':data-prepper-plugin-framework')
]
project(':data-prepper-plugin-framework'),
project(':data-prepper-plugin-schema-cli')
]
3 changes: 3 additions & 0 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -226,6 +226,9 @@ subprojects {

test {
useJUnitPlatform()
javaLauncher = javaToolchains.launcherFor {
languageVersion = JavaLanguageVersion.current()
}
reports {
junitXml.required
html.required
Expand Down
2 changes: 2 additions & 0 deletions config/checkstyle/checkstyle-suppressions.xml
Original file line number Diff line number Diff line change
Expand Up @@ -14,4 +14,6 @@
<suppress files="data-prepper-expression[\\/]build[\\/]generated-src[\\/]antlr[\\/]main[\\/]*" checks="[a-zA-Z0-9]*"/>
<!-- The stdout sink must call System.out.println -->
<suppress files="data-prepper-plugins[\\/]common[\\/]src[\\/]main[\\/]java[\\/]org[\\/]opensearch[\\/]dataprepper[\\/]plugins[\\/]sink[\\/]StdOutSink.java" checks="Regexp"/>
<!-- The DataPrepperPluginSchemaExecute must call System.out.println -->
<suppress files="data-prepper-plugin-schema-cli[\\/]src[\\/]main[\\/]java[\\/]org[\\/]opensearch[\\/]dataprepper[\\/]schemas[\\/]DataPrepperPluginSchemaExecute.java" checks="Regexp"/>
</suppressions>
Original file line number Diff line number Diff line change
Expand Up @@ -84,10 +84,24 @@ default boolean isByteBuffer() {
return false;
}

/**
* returns max request size of an entry in the buffer
*
* @return Optional value of the buffer's max request size
*/
default Optional<Integer> getMaxRequestSize() {
return Optional.empty();
}

/**
* returns optimal request size of an entry in the buffer
*
* @return Optional value of the buffer's optimal request size
*/
default Optional<Integer> getOptimalRequestSize() {
return Optional.empty();
}

/**
* Checks if the buffer enables acknowledgements for the pipeline
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,25 +24,30 @@
* @since 1.2
*/
public class PipelineModel {
public static final String SOURCE_PLUGIN_TYPE = "source";
public static final String PROCESSOR_PLUGIN_TYPE = "processor";
public static final String BUFFER_PLUGIN_TYPE = "buffer";
public static final String ROUTE_PLUGIN_TYPE = "route";
public static final String SINK_PLUGIN_TYPE = "sink";
private static final Logger LOG = LoggerFactory.getLogger(PipelineModel.class);

@JsonProperty("source")
@JsonProperty(SOURCE_PLUGIN_TYPE)
private final PluginModel source;

@JsonProperty("processor")
@JsonProperty(PROCESSOR_PLUGIN_TYPE)
@JsonInclude(JsonInclude.Include.NON_NULL)
private final List<PluginModel> processors;

@JsonProperty("buffer")
@JsonProperty(BUFFER_PLUGIN_TYPE)
@JsonInclude(JsonInclude.Include.NON_NULL)
private final PluginModel buffer;

@JsonProperty("routes")
@JsonAlias("route")
@JsonAlias(ROUTE_PLUGIN_TYPE)
@JsonInclude(JsonInclude.Include.NON_EMPTY)
private final List<ConditionalRoute> routes;

@JsonProperty("sink")
@JsonProperty(SINK_PLUGIN_TYPE)
private final List<SinkModel> sinks;

@JsonProperty("workers")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ public abstract class AbstractSink<T extends Record<?>> implements Sink<T> {
private Thread retryThread;
private int maxRetries;
private int waitTimeMs;
private SinkThread sinkThread;

public AbstractSink(final PluginSetting pluginSetting, int numRetries, int waitTimeMs) {
this.pluginMetrics = PluginMetrics.fromPluginSetting(pluginSetting);
Expand All @@ -51,7 +52,8 @@ public void initialize() {
// the exceptions which are not retryable.
doInitialize();
if (!isReady() && retryThread == null) {
retryThread = new Thread(new SinkThread(this, maxRetries, waitTimeMs));
sinkThread = new SinkThread(this, maxRetries, waitTimeMs);
retryThread = new Thread(sinkThread);
retryThread.start();
}
}
Expand All @@ -76,7 +78,7 @@ public void output(Collection<T> records) {
@Override
public void shutdown() {
if (retryThread != null) {
retryThread.stop();
sinkThread.stop();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ class SinkThread implements Runnable {
private int maxRetries;
private int waitTimeMs;

private volatile boolean isStopped = false;

public SinkThread(AbstractSink sink, int maxRetries, int waitTimeMs) {
this.sink = sink;
this.maxRetries = maxRetries;
Expand All @@ -19,11 +21,15 @@ public SinkThread(AbstractSink sink, int maxRetries, int waitTimeMs) {
@Override
public void run() {
int numRetries = 0;
while (!sink.isReady() && numRetries++ < maxRetries) {
while (!sink.isReady() && numRetries++ < maxRetries && !isStopped) {
try {
Thread.sleep(waitTimeMs);
sink.doInitialize();
} catch (InterruptedException e){}
}
}

public void stop() {
isStopped = true;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,12 @@ void testMaxRequestSize() {
assertEquals(buffer.getMaxRequestSize(), Optional.empty());
}

@Test
void testOptimalRequestSize() {
final Buffer<Record<Event>> buffer = createObjectUnderTest();
assertEquals(buffer.getOptimalRequestSize(), Optional.empty());
}

@Test
void testShutdown() {
final Buffer<Record<Event>> buffer = createObjectUnderTest();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,15 +11,10 @@
import org.opensearch.dataprepper.metrics.MetricNames;
import org.opensearch.dataprepper.metrics.MetricsTestUtil;
import org.opensearch.dataprepper.model.configuration.PluginSetting;
import org.opensearch.dataprepper.model.record.Record;
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.event.JacksonEvent;
import org.opensearch.dataprepper.model.event.EventHandle;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.Mockito.when;
import static org.mockito.Mockito.mock;
import org.opensearch.dataprepper.model.event.JacksonEvent;
import org.opensearch.dataprepper.model.record.Record;

import java.time.Duration;
import java.util.Arrays;
Expand All @@ -30,6 +25,12 @@
import java.util.UUID;

import static org.awaitility.Awaitility.await;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

class AbstractSinkTest {
private int count;
Expand Down Expand Up @@ -71,13 +72,13 @@ void testMetrics() {
}

@Test
void testSinkNotReady() {
void testSinkNotReady() throws InterruptedException {
final String sinkName = "testSink";
final String pipelineName = "pipelineName";
MetricsTestUtil.initMetrics();
PluginSetting pluginSetting = new PluginSetting(sinkName, Collections.emptyMap());
pluginSetting.setPipelineName(pipelineName);
AbstractSink<Record<String>> abstractSink = new AbstractSinkNotReadyImpl(pluginSetting);
AbstractSinkNotReadyImpl abstractSink = new AbstractSinkNotReadyImpl(pluginSetting);
abstractSink.initialize();
assertEquals(abstractSink.isReady(), false);
assertEquals(abstractSink.getRetryThreadState(), Thread.State.RUNNABLE);
Expand All @@ -87,7 +88,10 @@ void testSinkNotReady() {
await().atMost(Duration.ofSeconds(5))
.until(abstractSink::isReady);
assertEquals(abstractSink.getRetryThreadState(), Thread.State.TERMINATED);
int initCountBeforeShutdown = abstractSink.initCount;
abstractSink.shutdown();
Thread.sleep(200);
assertThat(abstractSink.initCount, equalTo(initCountBeforeShutdown));
}

@Test
Expand Down
3 changes: 0 additions & 3 deletions data-prepper-core/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,6 @@ dependencies {
exclude group: 'commons-logging', module: 'commons-logging'
}
implementation 'software.amazon.cloudwatchlogs:aws-embedded-metrics:2.0.0-beta-1'
testImplementation 'org.apache.logging.log4j:log4j-jpl:2.23.0'
testImplementation testLibs.spring.test
implementation libs.armeria.core
implementation libs.armeria.grpc
Expand Down Expand Up @@ -89,8 +88,6 @@ task integrationTest(type: Test) {

classpath = sourceSets.integrationTest.runtimeClasspath

systemProperty 'log4j.configurationFile', 'src/test/resources/log4j2.properties'

filter {
includeTestsMatching '*IT'
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ void simple_pipeline_with_single_record() {
final int numRecords = 1;
inMemorySourceAccessor.submit(IN_MEMORY_IDENTIFIER, numRecords);

await().atMost(20000, TimeUnit.MILLISECONDS)
await().atMost(40000, TimeUnit.MILLISECONDS)
.untilAsserted(() -> {
List<Record<Event>> outputRecords = inMemorySinkAccessor.get(IN_MEMORY_IDENTIFIER);
assertThat(outputRecords, not(empty()));
Expand All @@ -84,7 +84,7 @@ void simple_pipeline_with_multiple_records() {
final int numRecords = 100;
inMemorySourceAccessor.submit(IN_MEMORY_IDENTIFIER, numRecords);

await().atMost(20000, TimeUnit.MILLISECONDS)
await().atMost(40000, TimeUnit.MILLISECONDS)
.untilAsserted(() -> {
List<Record<Event>> outputRecords = inMemorySinkAccessor.get(IN_MEMORY_IDENTIFIER);
assertThat(outputRecords, not(empty()));
Expand All @@ -99,7 +99,7 @@ void two_pipelines_with_multiple_records() {
final int numRecords = 100;
inMemorySourceAccessor.submit(IN_MEMORY_IDENTIFIER, numRecords);

await().atMost(20000, TimeUnit.MILLISECONDS)
await().atMost(40000, TimeUnit.MILLISECONDS)
.untilAsserted(() -> {
List<Record<Event>> outputRecords = inMemorySinkAccessor.get(IN_MEMORY_IDENTIFIER);
assertThat(outputRecords, not(empty()));
Expand All @@ -114,7 +114,7 @@ void three_pipelines_with_multiple_records() {
final int numRecords = 100;
inMemorySourceAccessor.submit(IN_MEMORY_IDENTIFIER, numRecords);

await().atMost(20000, TimeUnit.MILLISECONDS)
await().atMost(40000, TimeUnit.MILLISECONDS)
.untilAsserted(() -> {
List<Record<Event>> outputRecords = inMemorySinkAccessor.get(IN_MEMORY_IDENTIFIER);
assertThat(outputRecords, not(empty()));
Expand All @@ -129,7 +129,7 @@ void three_pipelines_with_all_unrouted_records() {
final int numRecords = 2;
inMemorySourceAccessor.submitWithStatus(IN_MEMORY_IDENTIFIER, numRecords);

await().atMost(20000, TimeUnit.MILLISECONDS)
await().atMost(40000, TimeUnit.MILLISECONDS)
.untilAsserted(() -> {
assertTrue(inMemorySourceAccessor != null);
assertTrue(inMemorySourceAccessor.getAckReceived() != null);
Expand All @@ -145,7 +145,7 @@ void three_pipelines_with_route_and_multiple_records() {
final int numRecords = 100;
inMemorySourceAccessor.submitWithStatus(IN_MEMORY_IDENTIFIER, numRecords);

await().atMost(20000, TimeUnit.MILLISECONDS)
await().atMost(40000, TimeUnit.MILLISECONDS)
.untilAsserted(() -> {
List<Record<Event>> outputRecords = inMemorySinkAccessor.get(IN_MEMORY_IDENTIFIER);
assertThat(outputRecords, not(empty()));
Expand All @@ -161,7 +161,7 @@ void three_pipelines_with_default_route_and_multiple_records() {

inMemorySourceAccessor.submitWithStatus(IN_MEMORY_IDENTIFIER, numRecords);

await().atMost(20000, TimeUnit.MILLISECONDS)
await().atMost(40000, TimeUnit.MILLISECONDS)
.untilAsserted(() -> {
List<Record<Event>> outputRecords = inMemorySinkAccessor.get(IN_MEMORY_IDENTIFIER);
assertThat(outputRecords, not(empty()));
Expand All @@ -176,7 +176,7 @@ void two_parallel_pipelines_multiple_records() {
final int numRecords = 100;
inMemorySourceAccessor.submit(IN_MEMORY_IDENTIFIER, numRecords);

await().atMost(20000, TimeUnit.MILLISECONDS)
await().atMost(40000, TimeUnit.MILLISECONDS)
.untilAsserted(() -> {
List<Record<Event>> outputRecords = inMemorySinkAccessor.get(IN_MEMORY_IDENTIFIER);
assertThat(outputRecords, not(empty()));
Expand All @@ -191,7 +191,7 @@ void three_pipelines_multi_sink_multiple_records() {
final int numRecords = 100;
inMemorySourceAccessor.submit(IN_MEMORY_IDENTIFIER, numRecords);

await().atMost(20000, TimeUnit.MILLISECONDS)
await().atMost(40000, TimeUnit.MILLISECONDS)
.untilAsserted(() -> {
List<Record<Event>> outputRecords = inMemorySinkAccessor.get(IN_MEMORY_IDENTIFIER);
assertThat(outputRecords, not(empty()));
Expand All @@ -206,7 +206,7 @@ void one_pipeline_three_sinks_multiple_records() {
final int numRecords = 100;
inMemorySourceAccessor.submit(IN_MEMORY_IDENTIFIER, numRecords);

await().atMost(20000, TimeUnit.MILLISECONDS)
await().atMost(40000, TimeUnit.MILLISECONDS)
.untilAsserted(() -> {
List<Record<Event>> outputRecords = inMemorySinkAccessor.get(IN_MEMORY_IDENTIFIER);
assertThat(outputRecords, not(empty()));
Expand All @@ -221,7 +221,7 @@ void one_pipeline_ack_expiry_multiple_records() {
final int numRecords = 100;
inMemorySourceAccessor.submit(IN_MEMORY_IDENTIFIER, numRecords);

await().atMost(20000, TimeUnit.MILLISECONDS)
await().atMost(40000, TimeUnit.MILLISECONDS)
.untilAsserted(() -> {
List<Record<Event>> outputRecords = inMemorySinkAccessor.get(IN_MEMORY_IDENTIFIER);
assertThat(outputRecords, not(empty()));
Expand All @@ -237,7 +237,7 @@ void one_pipeline_three_sinks_negative_ack_multiple_records() {
inMemorySourceAccessor.submit(IN_MEMORY_IDENTIFIER, numRecords);
inMemorySinkAccessor.setResult(false);

await().atMost(20000, TimeUnit.MILLISECONDS)
await().atMost(40000, TimeUnit.MILLISECONDS)
.untilAsserted(() -> {
List<Record<Event>> outputRecords = inMemorySinkAccessor.get(IN_MEMORY_IDENTIFIER);
assertThat(outputRecords, not(empty()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,12 @@ public Optional<Integer> getMaxRequestSize() {
return maxRequestSize.isPresent() ? Optional.of(maxRequestSize.getAsInt()) : Optional.empty();
}

@Override
public Optional<Integer> getOptimalRequestSize() {
OptionalInt optimalRequestSize = allBuffers.stream().filter(b -> b.getOptimalRequestSize().isPresent()).mapToInt(b -> (Integer)b.getOptimalRequestSize().get()).min();
return optimalRequestSize.isPresent() ? Optional.of(optimalRequestSize.getAsInt()) : Optional.empty();
}

@Override
public void shutdown() {
allBuffers.forEach(Buffer::shutdown);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -231,11 +231,20 @@ void test_getMaxRequestSize() {
assertThat(multiBufferDecorator.getMaxRequestSize(), equalTo(Optional.empty()));
}

@Test
void test_getOptimalRequestSize() {
when(primaryBuffer.getOptimalRequestSize()).thenReturn(Optional.empty());
when(secondaryBuffer.getOptimalRequestSize()).thenReturn(Optional.empty());

final MultiBufferDecorator multiBufferDecorator = createObjectUnderTest(2);
assertThat(multiBufferDecorator.getOptimalRequestSize(), equalTo(Optional.empty()));
}

private MultiBufferDecorator createObjectUnderTest(final int secondaryBufferCount) {
final List<Buffer> secondaryBuffers = IntStream.range(0, secondaryBufferCount)
.mapToObj(i -> secondaryBuffer)
.collect(Collectors.toList());

return new MultiBufferDecorator(primaryBuffer, secondaryBuffers);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ public int getSymbol() {
@Override
public Boolean evaluate(final Object ... args) {
checkArgument(args.length == 2, displayName + " requires operands length needs to be 2.");
if(args[0] == null)
return false;
checkArgument(args[0] instanceof String, displayName + " requires left operand to be String.");
checkArgument(args[1] instanceof String, displayName + " requires right operand to be String.");
try {
Expand Down
Loading

0 comments on commit 918dfd9

Please sign in to comment.