diff --git a/.github/CODEOWNERS b/.github/CODEOWNERS
index babe6b3d56..54c084c0f8 100644
--- a/.github/CODEOWNERS
+++ b/.github/CODEOWNERS
@@ -1,2 +1,2 @@
# This should match the owning team set up in https://github.com/orgs/opensearch-project/teams
-* @chenqi0805 @engechas @graytaylor0 @dinujoh @kkondaka @asifsmohammed @KarstenSchnitter @dlvenable @oeyh
\ No newline at end of file
+* @chenqi0805 @engechas @graytaylor0 @dinujoh @kkondaka @KarstenSchnitter @dlvenable @oeyh
\ No newline at end of file
diff --git a/TRIAGING.md b/TRIAGING.md
index a4a25e1932..ba20857061 100644
--- a/TRIAGING.md
+++ b/TRIAGING.md
@@ -19,7 +19,7 @@ However, should we run out of time before your issue is discussed, you are alway
Meetings are hosted regularly Tuesdays at 2:30 PM US Central Time (12:30 PM Pacific Time) and can be joined via the links posted on the [OpenSearch Meetup Group](https://www.meetup.com/opensearch/events/) list of events.
The event will be titled `Data Prepper Triage Meeting`.
-After joining the Zoom meeting, you can enable your video / voice to join the discussion.
+After joining the video meeting, you can enable your video / voice to join the discussion.
If you do not have a webcam or microphone available, you can still join in via the text chat.
If you have an issue you'd like to bring forth please consider getting a link to the issue so it can be presented to everyone in the meeting.
diff --git a/build-resources.gradle b/build-resources.gradle
index b8a7bd5879..446f1d97b2 100644
--- a/build-resources.gradle
+++ b/build-resources.gradle
@@ -14,5 +14,6 @@ ext.coreProjects = [
project(':data-prepper-plugins'),
project(':data-prepper-test-common'),
project(':data-prepper-test-event'),
- project(':data-prepper-plugin-framework')
-]
\ No newline at end of file
+ project(':data-prepper-plugin-framework'),
+ project(':data-prepper-plugin-schema-cli')
+]
diff --git a/build.gradle b/build.gradle
index f77ecc442b..3dccd497cf 100644
--- a/build.gradle
+++ b/build.gradle
@@ -226,6 +226,9 @@ subprojects {
test {
useJUnitPlatform()
+ javaLauncher = javaToolchains.launcherFor {
+ languageVersion = JavaLanguageVersion.current()
+ }
reports {
junitXml.required
html.required
diff --git a/config/checkstyle/checkstyle-suppressions.xml b/config/checkstyle/checkstyle-suppressions.xml
index 42c37e7dd5..ab3ba001a9 100644
--- a/config/checkstyle/checkstyle-suppressions.xml
+++ b/config/checkstyle/checkstyle-suppressions.xml
@@ -14,4 +14,6 @@
+
+
diff --git a/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/buffer/Buffer.java b/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/buffer/Buffer.java
index eaaa978230..874a9d350e 100644
--- a/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/buffer/Buffer.java
+++ b/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/buffer/Buffer.java
@@ -84,10 +84,24 @@ default boolean isByteBuffer() {
return false;
}
+ /**
+ * returns max request size of an entry in the buffer
+ *
+ * @return Optional value of the buffer's max request size
+ */
default Optional getMaxRequestSize() {
return Optional.empty();
}
+ /**
+ * returns optimal request size of an entry in the buffer
+ *
+ * @return Optional value of the buffer's optimal request size
+ */
+ default Optional getOptimalRequestSize() {
+ return Optional.empty();
+ }
+
/**
* Checks if the buffer enables acknowledgements for the pipeline
*
diff --git a/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/configuration/PipelineModel.java b/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/configuration/PipelineModel.java
index 1c8221f899..7af56175a0 100644
--- a/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/configuration/PipelineModel.java
+++ b/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/configuration/PipelineModel.java
@@ -24,25 +24,30 @@
* @since 1.2
*/
public class PipelineModel {
+ public static final String SOURCE_PLUGIN_TYPE = "source";
+ public static final String PROCESSOR_PLUGIN_TYPE = "processor";
+ public static final String BUFFER_PLUGIN_TYPE = "buffer";
+ public static final String ROUTE_PLUGIN_TYPE = "route";
+ public static final String SINK_PLUGIN_TYPE = "sink";
private static final Logger LOG = LoggerFactory.getLogger(PipelineModel.class);
- @JsonProperty("source")
+ @JsonProperty(SOURCE_PLUGIN_TYPE)
private final PluginModel source;
- @JsonProperty("processor")
+ @JsonProperty(PROCESSOR_PLUGIN_TYPE)
@JsonInclude(JsonInclude.Include.NON_NULL)
private final List processors;
- @JsonProperty("buffer")
+ @JsonProperty(BUFFER_PLUGIN_TYPE)
@JsonInclude(JsonInclude.Include.NON_NULL)
private final PluginModel buffer;
@JsonProperty("routes")
- @JsonAlias("route")
+ @JsonAlias(ROUTE_PLUGIN_TYPE)
@JsonInclude(JsonInclude.Include.NON_EMPTY)
private final List routes;
- @JsonProperty("sink")
+ @JsonProperty(SINK_PLUGIN_TYPE)
private final List sinks;
@JsonProperty("workers")
diff --git a/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/sink/AbstractSink.java b/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/sink/AbstractSink.java
index 1c3e596265..26dd7e98a6 100644
--- a/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/sink/AbstractSink.java
+++ b/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/sink/AbstractSink.java
@@ -28,6 +28,7 @@ public abstract class AbstractSink> implements Sink {
private Thread retryThread;
private int maxRetries;
private int waitTimeMs;
+ private SinkThread sinkThread;
public AbstractSink(final PluginSetting pluginSetting, int numRetries, int waitTimeMs) {
this.pluginMetrics = PluginMetrics.fromPluginSetting(pluginSetting);
@@ -51,7 +52,8 @@ public void initialize() {
// the exceptions which are not retryable.
doInitialize();
if (!isReady() && retryThread == null) {
- retryThread = new Thread(new SinkThread(this, maxRetries, waitTimeMs));
+ sinkThread = new SinkThread(this, maxRetries, waitTimeMs);
+ retryThread = new Thread(sinkThread);
retryThread.start();
}
}
@@ -76,7 +78,7 @@ public void output(Collection records) {
@Override
public void shutdown() {
if (retryThread != null) {
- retryThread.stop();
+ sinkThread.stop();
}
}
diff --git a/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/sink/SinkThread.java b/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/sink/SinkThread.java
index c304de37af..451cef7dff 100644
--- a/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/sink/SinkThread.java
+++ b/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/sink/SinkThread.java
@@ -10,6 +10,8 @@ class SinkThread implements Runnable {
private int maxRetries;
private int waitTimeMs;
+ private volatile boolean isStopped = false;
+
public SinkThread(AbstractSink sink, int maxRetries, int waitTimeMs) {
this.sink = sink;
this.maxRetries = maxRetries;
@@ -19,11 +21,15 @@ public SinkThread(AbstractSink sink, int maxRetries, int waitTimeMs) {
@Override
public void run() {
int numRetries = 0;
- while (!sink.isReady() && numRetries++ < maxRetries) {
+ while (!sink.isReady() && numRetries++ < maxRetries && !isStopped) {
try {
Thread.sleep(waitTimeMs);
sink.doInitialize();
} catch (InterruptedException e){}
}
}
+
+ public void stop() {
+ isStopped = true;
+ }
}
diff --git a/data-prepper-api/src/test/java/org/opensearch/dataprepper/model/buffer/BufferTest.java b/data-prepper-api/src/test/java/org/opensearch/dataprepper/model/buffer/BufferTest.java
index 2236f0ba33..0d9aa51296 100644
--- a/data-prepper-api/src/test/java/org/opensearch/dataprepper/model/buffer/BufferTest.java
+++ b/data-prepper-api/src/test/java/org/opensearch/dataprepper/model/buffer/BufferTest.java
@@ -37,6 +37,12 @@ void testMaxRequestSize() {
assertEquals(buffer.getMaxRequestSize(), Optional.empty());
}
+ @Test
+ void testOptimalRequestSize() {
+ final Buffer> buffer = createObjectUnderTest();
+ assertEquals(buffer.getOptimalRequestSize(), Optional.empty());
+ }
+
@Test
void testShutdown() {
final Buffer> buffer = createObjectUnderTest();
diff --git a/data-prepper-api/src/test/java/org/opensearch/dataprepper/model/sink/AbstractSinkTest.java b/data-prepper-api/src/test/java/org/opensearch/dataprepper/model/sink/AbstractSinkTest.java
index 3b9fe7c007..8d1af7ea44 100644
--- a/data-prepper-api/src/test/java/org/opensearch/dataprepper/model/sink/AbstractSinkTest.java
+++ b/data-prepper-api/src/test/java/org/opensearch/dataprepper/model/sink/AbstractSinkTest.java
@@ -11,15 +11,10 @@
import org.opensearch.dataprepper.metrics.MetricNames;
import org.opensearch.dataprepper.metrics.MetricsTestUtil;
import org.opensearch.dataprepper.model.configuration.PluginSetting;
-import org.opensearch.dataprepper.model.record.Record;
import org.opensearch.dataprepper.model.event.Event;
-import org.opensearch.dataprepper.model.event.JacksonEvent;
import org.opensearch.dataprepper.model.event.EventHandle;
-
-import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.junit.jupiter.api.Assertions.assertTrue;
-import static org.mockito.Mockito.when;
-import static org.mockito.Mockito.mock;
+import org.opensearch.dataprepper.model.event.JacksonEvent;
+import org.opensearch.dataprepper.model.record.Record;
import java.time.Duration;
import java.util.Arrays;
@@ -30,6 +25,12 @@
import java.util.UUID;
import static org.awaitility.Awaitility.await;
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
class AbstractSinkTest {
private int count;
@@ -71,13 +72,13 @@ void testMetrics() {
}
@Test
- void testSinkNotReady() {
+ void testSinkNotReady() throws InterruptedException {
final String sinkName = "testSink";
final String pipelineName = "pipelineName";
MetricsTestUtil.initMetrics();
PluginSetting pluginSetting = new PluginSetting(sinkName, Collections.emptyMap());
pluginSetting.setPipelineName(pipelineName);
- AbstractSink> abstractSink = new AbstractSinkNotReadyImpl(pluginSetting);
+ AbstractSinkNotReadyImpl abstractSink = new AbstractSinkNotReadyImpl(pluginSetting);
abstractSink.initialize();
assertEquals(abstractSink.isReady(), false);
assertEquals(abstractSink.getRetryThreadState(), Thread.State.RUNNABLE);
@@ -87,7 +88,10 @@ void testSinkNotReady() {
await().atMost(Duration.ofSeconds(5))
.until(abstractSink::isReady);
assertEquals(abstractSink.getRetryThreadState(), Thread.State.TERMINATED);
+ int initCountBeforeShutdown = abstractSink.initCount;
abstractSink.shutdown();
+ Thread.sleep(200);
+ assertThat(abstractSink.initCount, equalTo(initCountBeforeShutdown));
}
@Test
diff --git a/data-prepper-core/build.gradle b/data-prepper-core/build.gradle
index 080538c5e4..c939129a1c 100644
--- a/data-prepper-core/build.gradle
+++ b/data-prepper-core/build.gradle
@@ -48,7 +48,6 @@ dependencies {
exclude group: 'commons-logging', module: 'commons-logging'
}
implementation 'software.amazon.cloudwatchlogs:aws-embedded-metrics:2.0.0-beta-1'
- testImplementation 'org.apache.logging.log4j:log4j-jpl:2.23.0'
testImplementation testLibs.spring.test
implementation libs.armeria.core
implementation libs.armeria.grpc
@@ -89,8 +88,6 @@ task integrationTest(type: Test) {
classpath = sourceSets.integrationTest.runtimeClasspath
- systemProperty 'log4j.configurationFile', 'src/test/resources/log4j2.properties'
-
filter {
includeTestsMatching '*IT'
}
diff --git a/data-prepper-core/src/integrationTest/java/org/opensearch/dataprepper/integration/PipelinesWithAcksIT.java b/data-prepper-core/src/integrationTest/java/org/opensearch/dataprepper/integration/PipelinesWithAcksIT.java
index 744105d46d..7d3a73d7a5 100644
--- a/data-prepper-core/src/integrationTest/java/org/opensearch/dataprepper/integration/PipelinesWithAcksIT.java
+++ b/data-prepper-core/src/integrationTest/java/org/opensearch/dataprepper/integration/PipelinesWithAcksIT.java
@@ -68,7 +68,7 @@ void simple_pipeline_with_single_record() {
final int numRecords = 1;
inMemorySourceAccessor.submit(IN_MEMORY_IDENTIFIER, numRecords);
- await().atMost(20000, TimeUnit.MILLISECONDS)
+ await().atMost(40000, TimeUnit.MILLISECONDS)
.untilAsserted(() -> {
List> outputRecords = inMemorySinkAccessor.get(IN_MEMORY_IDENTIFIER);
assertThat(outputRecords, not(empty()));
@@ -84,7 +84,7 @@ void simple_pipeline_with_multiple_records() {
final int numRecords = 100;
inMemorySourceAccessor.submit(IN_MEMORY_IDENTIFIER, numRecords);
- await().atMost(20000, TimeUnit.MILLISECONDS)
+ await().atMost(40000, TimeUnit.MILLISECONDS)
.untilAsserted(() -> {
List> outputRecords = inMemorySinkAccessor.get(IN_MEMORY_IDENTIFIER);
assertThat(outputRecords, not(empty()));
@@ -99,7 +99,7 @@ void two_pipelines_with_multiple_records() {
final int numRecords = 100;
inMemorySourceAccessor.submit(IN_MEMORY_IDENTIFIER, numRecords);
- await().atMost(20000, TimeUnit.MILLISECONDS)
+ await().atMost(40000, TimeUnit.MILLISECONDS)
.untilAsserted(() -> {
List> outputRecords = inMemorySinkAccessor.get(IN_MEMORY_IDENTIFIER);
assertThat(outputRecords, not(empty()));
@@ -114,7 +114,7 @@ void three_pipelines_with_multiple_records() {
final int numRecords = 100;
inMemorySourceAccessor.submit(IN_MEMORY_IDENTIFIER, numRecords);
- await().atMost(20000, TimeUnit.MILLISECONDS)
+ await().atMost(40000, TimeUnit.MILLISECONDS)
.untilAsserted(() -> {
List> outputRecords = inMemorySinkAccessor.get(IN_MEMORY_IDENTIFIER);
assertThat(outputRecords, not(empty()));
@@ -129,7 +129,7 @@ void three_pipelines_with_all_unrouted_records() {
final int numRecords = 2;
inMemorySourceAccessor.submitWithStatus(IN_MEMORY_IDENTIFIER, numRecords);
- await().atMost(20000, TimeUnit.MILLISECONDS)
+ await().atMost(40000, TimeUnit.MILLISECONDS)
.untilAsserted(() -> {
assertTrue(inMemorySourceAccessor != null);
assertTrue(inMemorySourceAccessor.getAckReceived() != null);
@@ -145,7 +145,7 @@ void three_pipelines_with_route_and_multiple_records() {
final int numRecords = 100;
inMemorySourceAccessor.submitWithStatus(IN_MEMORY_IDENTIFIER, numRecords);
- await().atMost(20000, TimeUnit.MILLISECONDS)
+ await().atMost(40000, TimeUnit.MILLISECONDS)
.untilAsserted(() -> {
List> outputRecords = inMemorySinkAccessor.get(IN_MEMORY_IDENTIFIER);
assertThat(outputRecords, not(empty()));
@@ -161,7 +161,7 @@ void three_pipelines_with_default_route_and_multiple_records() {
inMemorySourceAccessor.submitWithStatus(IN_MEMORY_IDENTIFIER, numRecords);
- await().atMost(20000, TimeUnit.MILLISECONDS)
+ await().atMost(40000, TimeUnit.MILLISECONDS)
.untilAsserted(() -> {
List> outputRecords = inMemorySinkAccessor.get(IN_MEMORY_IDENTIFIER);
assertThat(outputRecords, not(empty()));
@@ -176,7 +176,7 @@ void two_parallel_pipelines_multiple_records() {
final int numRecords = 100;
inMemorySourceAccessor.submit(IN_MEMORY_IDENTIFIER, numRecords);
- await().atMost(20000, TimeUnit.MILLISECONDS)
+ await().atMost(40000, TimeUnit.MILLISECONDS)
.untilAsserted(() -> {
List> outputRecords = inMemorySinkAccessor.get(IN_MEMORY_IDENTIFIER);
assertThat(outputRecords, not(empty()));
@@ -191,7 +191,7 @@ void three_pipelines_multi_sink_multiple_records() {
final int numRecords = 100;
inMemorySourceAccessor.submit(IN_MEMORY_IDENTIFIER, numRecords);
- await().atMost(20000, TimeUnit.MILLISECONDS)
+ await().atMost(40000, TimeUnit.MILLISECONDS)
.untilAsserted(() -> {
List> outputRecords = inMemorySinkAccessor.get(IN_MEMORY_IDENTIFIER);
assertThat(outputRecords, not(empty()));
@@ -206,7 +206,7 @@ void one_pipeline_three_sinks_multiple_records() {
final int numRecords = 100;
inMemorySourceAccessor.submit(IN_MEMORY_IDENTIFIER, numRecords);
- await().atMost(20000, TimeUnit.MILLISECONDS)
+ await().atMost(40000, TimeUnit.MILLISECONDS)
.untilAsserted(() -> {
List> outputRecords = inMemorySinkAccessor.get(IN_MEMORY_IDENTIFIER);
assertThat(outputRecords, not(empty()));
@@ -221,7 +221,7 @@ void one_pipeline_ack_expiry_multiple_records() {
final int numRecords = 100;
inMemorySourceAccessor.submit(IN_MEMORY_IDENTIFIER, numRecords);
- await().atMost(20000, TimeUnit.MILLISECONDS)
+ await().atMost(40000, TimeUnit.MILLISECONDS)
.untilAsserted(() -> {
List> outputRecords = inMemorySinkAccessor.get(IN_MEMORY_IDENTIFIER);
assertThat(outputRecords, not(empty()));
@@ -237,7 +237,7 @@ void one_pipeline_three_sinks_negative_ack_multiple_records() {
inMemorySourceAccessor.submit(IN_MEMORY_IDENTIFIER, numRecords);
inMemorySinkAccessor.setResult(false);
- await().atMost(20000, TimeUnit.MILLISECONDS)
+ await().atMost(40000, TimeUnit.MILLISECONDS)
.untilAsserted(() -> {
List> outputRecords = inMemorySinkAccessor.get(IN_MEMORY_IDENTIFIER);
assertThat(outputRecords, not(empty()));
diff --git a/data-prepper-core/src/main/java/org/opensearch/dataprepper/parser/MultiBufferDecorator.java b/data-prepper-core/src/main/java/org/opensearch/dataprepper/parser/MultiBufferDecorator.java
index eaa6c09491..76440c0c56 100644
--- a/data-prepper-core/src/main/java/org/opensearch/dataprepper/parser/MultiBufferDecorator.java
+++ b/data-prepper-core/src/main/java/org/opensearch/dataprepper/parser/MultiBufferDecorator.java
@@ -50,6 +50,12 @@ public Optional getMaxRequestSize() {
return maxRequestSize.isPresent() ? Optional.of(maxRequestSize.getAsInt()) : Optional.empty();
}
+ @Override
+ public Optional getOptimalRequestSize() {
+ OptionalInt optimalRequestSize = allBuffers.stream().filter(b -> b.getOptimalRequestSize().isPresent()).mapToInt(b -> (Integer)b.getOptimalRequestSize().get()).min();
+ return optimalRequestSize.isPresent() ? Optional.of(optimalRequestSize.getAsInt()) : Optional.empty();
+ }
+
@Override
public void shutdown() {
allBuffers.forEach(Buffer::shutdown);
diff --git a/data-prepper-core/src/test/java/org/opensearch/dataprepper/parser/MultiBufferDecoratorTest.java b/data-prepper-core/src/test/java/org/opensearch/dataprepper/parser/MultiBufferDecoratorTest.java
index bae4cb763b..896268fcf4 100644
--- a/data-prepper-core/src/test/java/org/opensearch/dataprepper/parser/MultiBufferDecoratorTest.java
+++ b/data-prepper-core/src/test/java/org/opensearch/dataprepper/parser/MultiBufferDecoratorTest.java
@@ -231,6 +231,15 @@ void test_getMaxRequestSize() {
assertThat(multiBufferDecorator.getMaxRequestSize(), equalTo(Optional.empty()));
}
+ @Test
+ void test_getOptimalRequestSize() {
+ when(primaryBuffer.getOptimalRequestSize()).thenReturn(Optional.empty());
+ when(secondaryBuffer.getOptimalRequestSize()).thenReturn(Optional.empty());
+
+ final MultiBufferDecorator multiBufferDecorator = createObjectUnderTest(2);
+ assertThat(multiBufferDecorator.getOptimalRequestSize(), equalTo(Optional.empty()));
+ }
+
private MultiBufferDecorator createObjectUnderTest(final int secondaryBufferCount) {
final List secondaryBuffers = IntStream.range(0, secondaryBufferCount)
.mapToObj(i -> secondaryBuffer)
@@ -238,4 +247,4 @@ private MultiBufferDecorator createObjectUnderTest(final int secondaryBufferCoun
return new MultiBufferDecorator(primaryBuffer, secondaryBuffers);
}
-}
\ No newline at end of file
+}
diff --git a/data-prepper-expression/src/main/java/org/opensearch/dataprepper/expression/GenericRegexMatchOperator.java b/data-prepper-expression/src/main/java/org/opensearch/dataprepper/expression/GenericRegexMatchOperator.java
index 1154978116..b4d34dadc1 100644
--- a/data-prepper-expression/src/main/java/org/opensearch/dataprepper/expression/GenericRegexMatchOperator.java
+++ b/data-prepper-expression/src/main/java/org/opensearch/dataprepper/expression/GenericRegexMatchOperator.java
@@ -37,6 +37,8 @@ public int getSymbol() {
@Override
public Boolean evaluate(final Object ... args) {
checkArgument(args.length == 2, displayName + " requires operands length needs to be 2.");
+ if(args[0] == null)
+ return false;
checkArgument(args[0] instanceof String, displayName + " requires left operand to be String.");
checkArgument(args[1] instanceof String, displayName + " requires right operand to be String.");
try {
diff --git a/data-prepper-expression/src/test/java/org/opensearch/dataprepper/expression/GenericExpressionEvaluator_ConditionalIT.java b/data-prepper-expression/src/test/java/org/opensearch/dataprepper/expression/GenericExpressionEvaluator_ConditionalIT.java
index 0bef1a65a0..a8fc7971f3 100644
--- a/data-prepper-expression/src/test/java/org/opensearch/dataprepper/expression/GenericExpressionEvaluator_ConditionalIT.java
+++ b/data-prepper-expression/src/test/java/org/opensearch/dataprepper/expression/GenericExpressionEvaluator_ConditionalIT.java
@@ -35,6 +35,7 @@
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.equalTo;
import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.params.provider.Arguments.arguments;
class GenericExpressionEvaluator_ConditionalIT {
/**
@@ -145,76 +146,80 @@ private static Stream validExpressionArguments() {
int testStringLength = random.nextInt(10);
String testString = RandomStringUtils.randomAlphabetic(testStringLength);
return Stream.of(
- Arguments.of("true", event("{}"), true),
- Arguments.of("/status_code == 200", event("{\"status_code\": 200}"), true),
- Arguments.of("/status_code == 200", longEvent, true),
- Arguments.of("/status_code != 300", event("{\"status_code\": 200}"), true),
- Arguments.of("/status_code == 200", event("{}"), false),
- Arguments.of("/success == /status_code", event("{\"success\": true, \"status_code\": 200}"), false),
- Arguments.of("/success != /status_code", event("{\"success\": true, \"status_code\": 200}"), true),
- Arguments.of("/part1@part2.part3 != 111", event("{\"success\": true, \"part1@part2.part3\":111, \"status_code\": 200}"), false),
- Arguments.of("/part1.part2@part3 != 111", event("{\"success\": true, \"part1.part2@part3\":222, \"status_code\": 200}"), true),
- Arguments.of("/pi == 3.14159", event("{\"pi\": 3.14159}"), true),
- Arguments.of("/value == 12345.678", event("{\"value\": 12345.678}"), true),
- Arguments.of("/value == 12345.678E12", event("{\"value\": 12345.678E12}"), true),
- Arguments.of("/value == 12345.678e-12", event("{\"value\": 12345.678e-12}"), true),
- Arguments.of("/value == 12345.0000012", event("{\"value\": 12345.0000012}"), true),
- Arguments.of("/value == 12345.00012E6", event("{\"value\": 12345.00012E6}"), true),
- Arguments.of("true == (/is_cool == true)", event("{\"is_cool\": true}"), true),
- Arguments.of("not /is_cool", event("{\"is_cool\": true}"), false),
- Arguments.of("/status_code < 300", event("{\"status_code\": 200}"), true),
- Arguments.of("/status_code != null", event("{\"status_code\": 200}"), true),
- Arguments.of("null != /status_code", event("{\"status_code\": 200}"), true),
- Arguments.of("/status_code == null", event("{\"status_code\": null}"), true),
- Arguments.of("/response == null", event("{\"status_code\": 200}"), true),
- Arguments.of("null == /response", event("{\"status_code\": 200}"), true),
- Arguments.of("/response != null", event("{\"status_code\": 200}"), false),
- Arguments.of("/status_code <= 0", event("{\"status_code\": 200}"), false),
- Arguments.of("/status_code > 0", event("{\"status_code\": 200}"), true),
- Arguments.of("/status_code >= 300", event("{\"status_code\": 200}"), false),
- Arguments.of("-/status_code == -200", event("{\"status_code\": 200}"), true),
- Arguments.of("/success and /status_code == 200", event("{\"success\": true, \"status_code\": 200}"), true),
- Arguments.of("/success or /status_code == 200", event("{\"success\": false, \"status_code\": 200}"), true),
- Arguments.of("(/success == true) or (/status_code == 200)", event("{\"success\": false, \"status_code\": 200}"), true),
- Arguments.of("/should_drop", event("{\"should_drop\": true}"), true),
- Arguments.of("/should_drop", event("{\"should_drop\": false}"), false),
- Arguments.of("/logs/2/should_drop", event("{\"logs\": [{}, {}, {\"should_drop\": true}]}"), true),
- Arguments.of(
+ arguments("true", event("{}"), true),
+ arguments("/status_code == 200", event("{\"status_code\": 200}"), true),
+ arguments("/status_code == 200", longEvent, true),
+ arguments("/status_code != 300", event("{\"status_code\": 200}"), true),
+ arguments("/status_code == 200", event("{}"), false),
+ arguments("/success == /status_code", event("{\"success\": true, \"status_code\": 200}"), false),
+ arguments("/success != /status_code", event("{\"success\": true, \"status_code\": 200}"), true),
+ arguments("/part1@part2.part3 != 111", event("{\"success\": true, \"part1@part2.part3\":111, \"status_code\": 200}"), false),
+ arguments("/part1.part2@part3 != 111", event("{\"success\": true, \"part1.part2@part3\":222, \"status_code\": 200}"), true),
+ arguments("/pi == 3.14159", event("{\"pi\": 3.14159}"), true),
+ arguments("/value == 12345.678", event("{\"value\": 12345.678}"), true),
+ arguments("/value == 12345.678E12", event("{\"value\": 12345.678E12}"), true),
+ arguments("/value == 12345.678e-12", event("{\"value\": 12345.678e-12}"), true),
+ arguments("/value == 12345.0000012", event("{\"value\": 12345.0000012}"), true),
+ arguments("/value == 12345.00012E6", event("{\"value\": 12345.00012E6}"), true),
+ arguments("true == (/is_cool == true)", event("{\"is_cool\": true}"), true),
+ arguments("not /is_cool", event("{\"is_cool\": true}"), false),
+ arguments("/status_code < 300", event("{\"status_code\": 200}"), true),
+ arguments("/status_code != null", event("{\"status_code\": 200}"), true),
+ arguments("null != /status_code", event("{\"status_code\": 200}"), true),
+ arguments("/status_code == null", event("{\"status_code\": null}"), true),
+ arguments("/response == null", event("{\"status_code\": 200}"), true),
+ arguments("null == /response", event("{\"status_code\": 200}"), true),
+ arguments("/response != null", event("{\"status_code\": 200}"), false),
+ arguments("/status_code <= 0", event("{\"status_code\": 200}"), false),
+ arguments("/status_code > 0", event("{\"status_code\": 200}"), true),
+ arguments("/status_code >= 300", event("{\"status_code\": 200}"), false),
+ arguments("-/status_code == -200", event("{\"status_code\": 200}"), true),
+ arguments("/success and /status_code == 200", event("{\"success\": true, \"status_code\": 200}"), true),
+ arguments("/success or /status_code == 200", event("{\"success\": false, \"status_code\": 200}"), true),
+ arguments("(/success == true) or (/status_code == 200)", event("{\"success\": false, \"status_code\": 200}"), true),
+ arguments("/should_drop", event("{\"should_drop\": true}"), true),
+ arguments("/should_drop", event("{\"should_drop\": false}"), false),
+ arguments("/logs/2/should_drop", event("{\"logs\": [{}, {}, {\"should_drop\": true}]}"), true),
+ arguments(
escapedJsonPointer(ALL_JACKSON_EVENT_GET_SUPPORTED_CHARACTERS) + " == true",
complexEvent(ALL_JACKSON_EVENT_GET_SUPPORTED_CHARACTERS, true),
true),
- Arguments.of("/durationInNanos > 5000000000", event("{\"durationInNanos\": 6000000000}"), true),
- Arguments.of("/response == \"OK\"", event("{\"response\": \"OK\"}"), true),
- Arguments.of("length(/response) == "+testStringLength, event("{\"response\": \""+testString+"\"}"), true),
- Arguments.of("hasTags(\""+ testTag1+"\")", longEvent, true),
- Arguments.of("hasTags(\""+ testTag1+"\",\""+testTag2+"\")", longEvent, true),
- Arguments.of("hasTags(\""+ testTag1+"\", \""+testTag2+"\", \""+testTag3+"\")", longEvent, true),
- Arguments.of("hasTags(\""+ testTag4+"\")", longEvent, false),
- Arguments.of("hasTags(\""+ testTag3+"\",\""+testTag4+"\")", longEvent, false),
- Arguments.of("contains(\""+ strValue+"\",\""+strValue.substring(1,5)+"\")", longEvent, true),
- Arguments.of("contains(/status,\""+strValue.substring(0,2)+"\")", event("{\"status\":\""+strValue+"\"}"), true),
- Arguments.of("contains(\""+strValue+strValue+"\",/status)", event("{\"status\":\""+strValue+"\"}"), true),
- Arguments.of("contains(/message,/status)", event("{\"status\":\""+strValue+"\", \"message\":\""+strValue+strValue+"\"}"), true),
- Arguments.of("contains(/unknown,/status)", event("{\"status\":\""+strValue+"\", \"message\":\""+strValue+strValue+"\"}"), false),
- Arguments.of("contains(/status,/unknown)", event("{\"status\":\""+strValue+"\", \"message\":\""+strValue+strValue+"\"}"), false),
- Arguments.of("getMetadata(\"key1\") == \""+strValue+"\"", longEvent, true),
- Arguments.of("getMetadata(\"key2\") == "+value4, longEvent, true),
- Arguments.of("getMetadata(\"key3\") == "+value5, longEvent, true),
- Arguments.of("getMetadata(\"/key1\") == \""+strValue+"\"", longEvent, true),
- Arguments.of("getMetadata(\"/key2\") == "+value4, longEvent, true),
- Arguments.of("getMetadata(\"key3\") == "+value5, longEvent, true),
- Arguments.of("getMetadata(\"/key6\") == \""+value5+"\"", longEvent, false),
- Arguments.of("getMetadata(\"key6\") == "+value5, longEvent, false),
- Arguments.of("cidrContains(/sourceIp,\"192.0.2.0/24\")", event("{\"sourceIp\": \"192.0.2.3\"}"), true),
- Arguments.of("cidrContains(/sourceIp,\"192.0.2.0/24\",\"192.1.1.0/24\")", event("{\"sourceIp\": \"192.0.2.3\"}"), true),
- Arguments.of("cidrContains(/sourceIp,\"192.0.2.0/24\",\"192.1.1.0/24\")", event("{\"sourceIp\": \"192.2.2.3\"}"), false),
- Arguments.of("cidrContains(/sourceIp,\"2001:0db8::/32\")", event("{\"sourceIp\": \"2001:0db8:aaaa:bbbb::\"}"), true),
- Arguments.of("cidrContains(/sourceIp,\"2001:0db8::/32\",\"2001:aaaa::/32\")", event("{\"sourceIp\": \"2001:0db8:aaaa:bbbb::\"}"), true),
- Arguments.of("cidrContains(/sourceIp,\"2001:0db8::/32\",\"2001:aaaa::/32\")", event("{\"sourceIp\": \"2001:abcd:aaaa:bbbb::\"}"), false),
- Arguments.of("/sourceIp != null", event("{\"sourceIp\": [10, 20]}"), true),
- Arguments.of("/sourceIp == null", event("{\"sourceIp\": [\"test\", \"test_two\"]}"), false),
- Arguments.of("/sourceIp == null", event("{\"sourceIp\": {\"test\": \"test_two\"}}"), false),
- Arguments.of("/sourceIp != null", event("{\"sourceIp\": {\"test\": \"test_two\"}}"), true)
+ arguments("/durationInNanos > 5000000000", event("{\"durationInNanos\": 6000000000}"), true),
+ arguments("/response == \"OK\"", event("{\"response\": \"OK\"}"), true),
+ arguments("length(/response) == "+testStringLength, event("{\"response\": \""+testString+"\"}"), true),
+ arguments("hasTags(\""+ testTag1+"\")", longEvent, true),
+ arguments("hasTags(\""+ testTag1+"\",\""+testTag2+"\")", longEvent, true),
+ arguments("hasTags(\""+ testTag1+"\", \""+testTag2+"\", \""+testTag3+"\")", longEvent, true),
+ arguments("hasTags(\""+ testTag4+"\")", longEvent, false),
+ arguments("hasTags(\""+ testTag3+"\",\""+testTag4+"\")", longEvent, false),
+ arguments("contains(\""+ strValue+"\",\""+strValue.substring(1,5)+"\")", longEvent, true),
+ arguments("contains(/status,\""+strValue.substring(0,2)+"\")", event("{\"status\":\""+strValue+"\"}"), true),
+ arguments("contains(\""+strValue+strValue+"\",/status)", event("{\"status\":\""+strValue+"\"}"), true),
+ arguments("contains(/message,/status)", event("{\"status\":\""+strValue+"\", \"message\":\""+strValue+strValue+"\"}"), true),
+ arguments("contains(/unknown,/status)", event("{\"status\":\""+strValue+"\", \"message\":\""+strValue+strValue+"\"}"), false),
+ arguments("contains(/status,/unknown)", event("{\"status\":\""+strValue+"\", \"message\":\""+strValue+strValue+"\"}"), false),
+ arguments("getMetadata(\"key1\") == \""+strValue+"\"", longEvent, true),
+ arguments("getMetadata(\"key2\") == "+value4, longEvent, true),
+ arguments("getMetadata(\"key3\") == "+value5, longEvent, true),
+ arguments("getMetadata(\"/key1\") == \""+strValue+"\"", longEvent, true),
+ arguments("getMetadata(\"/key2\") == "+value4, longEvent, true),
+ arguments("getMetadata(\"key3\") == "+value5, longEvent, true),
+ arguments("getMetadata(\"/key6\") == \""+value5+"\"", longEvent, false),
+ arguments("getMetadata(\"key6\") == "+value5, longEvent, false),
+ arguments("cidrContains(/sourceIp,\"192.0.2.0/24\")", event("{\"sourceIp\": \"192.0.2.3\"}"), true),
+ arguments("cidrContains(/sourceIp,\"192.0.2.0/24\",\"192.1.1.0/24\")", event("{\"sourceIp\": \"192.0.2.3\"}"), true),
+ arguments("cidrContains(/sourceIp,\"192.0.2.0/24\",\"192.1.1.0/24\")", event("{\"sourceIp\": \"192.2.2.3\"}"), false),
+ arguments("cidrContains(/sourceIp,\"2001:0db8::/32\")", event("{\"sourceIp\": \"2001:0db8:aaaa:bbbb::\"}"), true),
+ arguments("cidrContains(/sourceIp,\"2001:0db8::/32\",\"2001:aaaa::/32\")", event("{\"sourceIp\": \"2001:0db8:aaaa:bbbb::\"}"), true),
+ arguments("cidrContains(/sourceIp,\"2001:0db8::/32\",\"2001:aaaa::/32\")", event("{\"sourceIp\": \"2001:abcd:aaaa:bbbb::\"}"), false),
+ arguments("/sourceIp != null", event("{\"sourceIp\": [10, 20]}"), true),
+ arguments("/sourceIp == null", event("{\"sourceIp\": [\"test\", \"test_two\"]}"), false),
+ arguments("/sourceIp == null", event("{\"sourceIp\": {\"test\": \"test_two\"}}"), false),
+ arguments("/sourceIp != null", event("{\"sourceIp\": {\"test\": \"test_two\"}}"), true),
+ arguments("/name =~ \".*dataprepper-[0-9]+\"", event("{\"name\": \"dataprepper-0\"}"), true),
+ arguments("/name =~ \".*dataprepper-[0-9]+\"", event("{\"name\": \"dataprepper-212\"}"), true),
+ arguments("/name =~ \".*dataprepper-[0-9]+\"", event("{\"name\": \"dataprepper-abc\"}"), false),
+ arguments("/name =~ \".*dataprepper-[0-9]+\"", event("{\"other\": \"dataprepper-abc\"}"), false)
);
}
@@ -236,43 +241,43 @@ private static Stream invalidExpressionArguments() {
int testStringLength = random.nextInt(10);
String testString = RandomStringUtils.randomAlphabetic(testStringLength);
return Stream.of(
- Arguments.of("/missing", event("{}")),
- Arguments.of("/success < /status_code", event("{\"success\": true, \"status_code\": 200}")),
- Arguments.of("/success <= /status_code", event("{\"success\": true, \"status_code\": 200}")),
- Arguments.of("/success > /status_code", event("{\"success\": true, \"status_code\": 200}")),
- Arguments.of("/success >= /status_code", event("{\"success\": true, \"status_code\": 200}")),
- Arguments.of("/success > null", event("{\"success\": true, \"status_code\": 200}")),
- Arguments.of("/success >= null", event("{\"success\": true, \"status_code\": 200}")),
- Arguments.of("/status_code < null", event("{\"success\": true, \"status_code\": 200}")),
- Arguments.of("/status_code <= null", event("{\"success\": true, \"status_code\": 200}")),
- Arguments.of("not /status_code", event("{\"status_code\": 200}")),
- Arguments.of("/status_code >= 200 and 3", event("{\"status_code\": 200}")),
- Arguments.of("", event("{}")),
- Arguments.of("-false", event("{}")),
- Arguments.of("not 5", event("{}")),
- Arguments.of("not null", event("{}")),
- Arguments.of("not/status_code", event("{\"status_code\": 200}")),
- Arguments.of("trueand/status_code", event("{\"status_code\": 200}")),
- Arguments.of("trueor/status_code", event("{\"status_code\": 200}")),
- Arguments.of("length(\""+testString+") == "+testStringLength, event("{\"response\": \""+testString+"\"}")),
- Arguments.of("length(\""+testString+"\") == "+testStringLength, event("{\"response\": \""+testString+"\"}")),
- Arguments.of("hasTags(10)", tagEvent),
- Arguments.of("hasTags("+ testTag1+")", tagEvent),
- Arguments.of("hasTags(\""+ testTag1+")", tagEvent),
- Arguments.of("hasTags(\""+ testTag1+"\","+testTag2+"\")", tagEvent),
- Arguments.of("hasTags(,\""+testTag2+"\")", tagEvent),
- Arguments.of("hasTags(\""+testTag2+"\",)", tagEvent),
- Arguments.of("contains(\""+testTag2+"\",)", tagEvent),
- Arguments.of("contains(\""+testTag2+"\")", tagEvent),
- Arguments.of("contains(/intField, /strField)", event("{\"intField\":1234,\"strField\":\"string\"}")),
- Arguments.of("contains(1234, /strField)", event("{\"intField\":1234,\"strField\":\"string\"}")),
- Arguments.of("contains(str, /strField)", event("{\"intField\":1234,\"strField\":\"string\"}")),
- Arguments.of("contains(/strField, 1234)", event("{\"intField\":1234,\"strField\":\"string\"}")),
- Arguments.of("getMetadata(10)", tagEvent),
- Arguments.of("getMetadata("+ testMetadataKey+ ")", tagEvent),
- Arguments.of("getMetadata(\""+ testMetadataKey+")", tagEvent),
- Arguments.of("cidrContains(/sourceIp)", event("{\"sourceIp\": \"192.0.2.3\"}")),
- Arguments.of("cidrContains(/sourceIp,123)", event("{\"sourceIp\": \"192.0.2.3\"}"))
+ arguments("/missing", event("{}")),
+ arguments("/success < /status_code", event("{\"success\": true, \"status_code\": 200}")),
+ arguments("/success <= /status_code", event("{\"success\": true, \"status_code\": 200}")),
+ arguments("/success > /status_code", event("{\"success\": true, \"status_code\": 200}")),
+ arguments("/success >= /status_code", event("{\"success\": true, \"status_code\": 200}")),
+ arguments("/success > null", event("{\"success\": true, \"status_code\": 200}")),
+ arguments("/success >= null", event("{\"success\": true, \"status_code\": 200}")),
+ arguments("/status_code < null", event("{\"success\": true, \"status_code\": 200}")),
+ arguments("/status_code <= null", event("{\"success\": true, \"status_code\": 200}")),
+ arguments("not /status_code", event("{\"status_code\": 200}")),
+ arguments("/status_code >= 200 and 3", event("{\"status_code\": 200}")),
+ arguments("", event("{}")),
+ arguments("-false", event("{}")),
+ arguments("not 5", event("{}")),
+ arguments("not null", event("{}")),
+ arguments("not/status_code", event("{\"status_code\": 200}")),
+ arguments("trueand/status_code", event("{\"status_code\": 200}")),
+ arguments("trueor/status_code", event("{\"status_code\": 200}")),
+ arguments("length(\""+testString+") == "+testStringLength, event("{\"response\": \""+testString+"\"}")),
+ arguments("length(\""+testString+"\") == "+testStringLength, event("{\"response\": \""+testString+"\"}")),
+ arguments("hasTags(10)", tagEvent),
+ arguments("hasTags("+ testTag1+")", tagEvent),
+ arguments("hasTags(\""+ testTag1+")", tagEvent),
+ arguments("hasTags(\""+ testTag1+"\","+testTag2+"\")", tagEvent),
+ arguments("hasTags(,\""+testTag2+"\")", tagEvent),
+ arguments("hasTags(\""+testTag2+"\",)", tagEvent),
+ arguments("contains(\""+testTag2+"\",)", tagEvent),
+ arguments("contains(\""+testTag2+"\")", tagEvent),
+ arguments("contains(/intField, /strField)", event("{\"intField\":1234,\"strField\":\"string\"}")),
+ arguments("contains(1234, /strField)", event("{\"intField\":1234,\"strField\":\"string\"}")),
+ arguments("contains(str, /strField)", event("{\"intField\":1234,\"strField\":\"string\"}")),
+ arguments("contains(/strField, 1234)", event("{\"intField\":1234,\"strField\":\"string\"}")),
+ arguments("getMetadata(10)", tagEvent),
+ arguments("getMetadata("+ testMetadataKey+ ")", tagEvent),
+ arguments("getMetadata(\""+ testMetadataKey+")", tagEvent),
+ arguments("cidrContains(/sourceIp)", event("{\"sourceIp\": \"192.0.2.3\"}")),
+ arguments("cidrContains(/sourceIp,123)", event("{\"sourceIp\": \"192.0.2.3\"}"))
);
}
diff --git a/data-prepper-expression/src/test/java/org/opensearch/dataprepper/expression/RegexEqualOperatorTest.java b/data-prepper-expression/src/test/java/org/opensearch/dataprepper/expression/RegexEqualOperatorTest.java
index 46fcdd9ccf..bb92cd1e49 100644
--- a/data-prepper-expression/src/test/java/org/opensearch/dataprepper/expression/RegexEqualOperatorTest.java
+++ b/data-prepper-expression/src/test/java/org/opensearch/dataprepper/expression/RegexEqualOperatorTest.java
@@ -12,6 +12,7 @@
import org.mockito.junit.jupiter.MockitoExtension;
import org.opensearch.dataprepper.expression.antlr.DataPrepperExpressionParser;
+import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.jupiter.api.Assertions.assertThrows;
@@ -64,4 +65,9 @@ void testEvalInValidArgType() {
void testEvalInValidPattern() {
assertThrows(IllegalArgumentException.class, () -> objectUnderTest.evaluate("a", "*"));
}
+
+ @Test
+ void evaluate_with_null_lhs_returns_false() {
+ assertThat(objectUnderTest.evaluate(null, "a*"), equalTo(false));
+ }
}
diff --git a/data-prepper-expression/src/test/java/org/opensearch/dataprepper/expression/RegexNotEqualOperatorTest.java b/data-prepper-expression/src/test/java/org/opensearch/dataprepper/expression/RegexNotEqualOperatorTest.java
index 221c354eb7..30bc199413 100644
--- a/data-prepper-expression/src/test/java/org/opensearch/dataprepper/expression/RegexNotEqualOperatorTest.java
+++ b/data-prepper-expression/src/test/java/org/opensearch/dataprepper/expression/RegexNotEqualOperatorTest.java
@@ -12,6 +12,7 @@
import org.mockito.junit.jupiter.MockitoExtension;
import org.opensearch.dataprepper.expression.antlr.DataPrepperExpressionParser;
+import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.jupiter.api.Assertions.assertThrows;
@@ -64,4 +65,9 @@ void testEvalInValidArgType() {
void testEvalInValidPattern() {
assertThrows(IllegalArgumentException.class, () -> objectUnderTest.evaluate("a", "*"));
}
+
+ @Test
+ void evaluate_with_null_lhs_returns_false() {
+ assertThat(objectUnderTest.evaluate(null, "a*"), equalTo(false));
+ }
}
diff --git a/data-prepper-plugin-schema-cli/README.md b/data-prepper-plugin-schema-cli/README.md
new file mode 100644
index 0000000000..30b0612603
--- /dev/null
+++ b/data-prepper-plugin-schema-cli/README.md
@@ -0,0 +1,13 @@
+# Data Prepper Plugin Schema CLI
+
+This module includes the SDK and CLI for generating schemas for Data Prepper pipeline plugins.
+
+## CLI Usage
+
+```
+./gradlew :data-prepper-plugin-schema-cli:run --args='--plugin_type=processor --plugin_names=grok --output_folder=/path/to/schemas'
+```
+
+* plugin_type: A required parameter specifies type of processor. Valid options are `source`, `buffer`, `processor`, `route`, `sink`.
+* plugin_names: An optional parameter filters the result by plugin names separated by `,`, e.g. `grok,date`.
+* output_folder: An optional parameter to specify the output folder path.
diff --git a/data-prepper-plugin-schema-cli/build.gradle b/data-prepper-plugin-schema-cli/build.gradle
new file mode 100644
index 0000000000..2108fad681
--- /dev/null
+++ b/data-prepper-plugin-schema-cli/build.gradle
@@ -0,0 +1,40 @@
+plugins {
+ id 'data-prepper.publish'
+ id 'application'
+}
+
+application {
+ mainClass = 'org.opensearch.dataprepper.schemas.DataPrepperPluginSchemaExecute'
+}
+
+dependencies {
+ implementation project(':data-prepper-plugins')
+ implementation project(':data-prepper-plugin-framework')
+ implementation 'com.fasterxml.jackson.core:jackson-databind'
+ implementation 'org.reflections:reflections:0.10.2'
+ implementation 'com.github.victools:jsonschema-maven-plugin:4.35.0'
+ implementation 'com.github.victools:jsonschema-generator:4.35.0'
+ implementation 'com.github.victools:jsonschema-module-jackson:4.35.0'
+ implementation 'com.github.victools:jsonschema-module-jakarta-validation:4.35.0'
+ implementation 'javax.inject:javax.inject:1'
+ implementation 'info.picocli:picocli:4.6.1'
+ implementation(libs.spring.core) {
+ exclude group: 'commons-logging', module: 'commons-logging'
+ }
+ implementation(libs.spring.context) {
+ exclude group: 'commons-logging', module: 'commons-logging'
+ }
+ testImplementation(platform("org.junit:junit-bom:5.9.1"))
+ testImplementation("org.junit.jupiter:junit-jupiter")
+}
+
+jacocoTestCoverageVerification {
+ afterEvaluate {
+ classDirectories.from = files(classDirectories.files.collect {
+ fileTree(dir: it, exclude: [
+ // Exclude main class
+ 'org/opensearch/dataprepper/schemas/DataPrepperPluginSchemaExecute.class'
+ ])
+ })
+ }
+}
\ No newline at end of file
diff --git a/data-prepper-plugin-schema-cli/src/main/java/org/opensearch/dataprepper/schemas/DataPrepperPluginSchemaExecute.java b/data-prepper-plugin-schema-cli/src/main/java/org/opensearch/dataprepper/schemas/DataPrepperPluginSchemaExecute.java
new file mode 100644
index 0000000000..a505a013c4
--- /dev/null
+++ b/data-prepper-plugin-schema-cli/src/main/java/org/opensearch/dataprepper/schemas/DataPrepperPluginSchemaExecute.java
@@ -0,0 +1,123 @@
+package org.opensearch.dataprepper.schemas;
+
+import com.github.victools.jsonschema.generator.Module;
+import com.github.victools.jsonschema.generator.OptionPreset;
+import com.github.victools.jsonschema.generator.SchemaVersion;
+import com.github.victools.jsonschema.module.jakarta.validation.JakartaValidationModule;
+import com.github.victools.jsonschema.module.jakarta.validation.JakartaValidationOption;
+import org.opensearch.dataprepper.schemas.module.CustomJacksonModule;
+import org.reflections.Reflections;
+import org.reflections.scanners.Scanners;
+import org.reflections.util.ClasspathHelper;
+import org.reflections.util.ConfigurationBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import picocli.CommandLine;
+
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+import static com.github.victools.jsonschema.module.jackson.JacksonOption.RESPECT_JSONPROPERTY_REQUIRED;
+
+public class DataPrepperPluginSchemaExecute implements Runnable {
+ private static final Logger LOG = LoggerFactory.getLogger(DataPrepperPluginSchemaExecute.class);
+ static final String DEFAULT_PLUGINS_CLASSPATH = "org.opensearch.dataprepper.plugins";
+
+ @CommandLine.Option(names = {"--plugin_type"}, required = true)
+ private String pluginTypeName;
+
+ @CommandLine.Option(names = {"--plugin_names"})
+ private String pluginNames;
+
+ @CommandLine.Option(names = {"--site.url"}, defaultValue = "https://opensearch.org")
+ private String siteUrl;
+ @CommandLine.Option(names = {"--site.baseurl"}, defaultValue = "/docs/latest")
+ private String siteBaseUrl;
+
+ @CommandLine.Option(names = {"--output_folder"})
+ private String folderPath;
+
+ public static void main(String[] args) {
+ final int exitCode = new CommandLine(new DataPrepperPluginSchemaExecute()).execute(args);
+ System.exit(exitCode);
+ }
+
+ @Override
+ public void run() {
+ final List modules = List.of(
+ new CustomJacksonModule(RESPECT_JSONPROPERTY_REQUIRED),
+ new JakartaValidationModule(JakartaValidationOption.NOT_NULLABLE_FIELD_IS_REQUIRED,
+ JakartaValidationOption.INCLUDE_PATTERN_EXPRESSIONS)
+ );
+ final Reflections reflections = new Reflections(new ConfigurationBuilder()
+ .setUrls(ClasspathHelper.forPackage(DEFAULT_PLUGINS_CLASSPATH))
+ .setScanners(Scanners.TypesAnnotated, Scanners.SubTypes));
+ final PluginConfigsJsonSchemaConverter pluginConfigsJsonSchemaConverter = new PluginConfigsJsonSchemaConverter(
+ reflections, new JsonSchemaConverter(modules), siteUrl, siteBaseUrl);
+ final Class> pluginType = pluginConfigsJsonSchemaConverter.pluginTypeNameToPluginType(pluginTypeName);
+ final Map pluginNameToJsonSchemaMap = pluginConfigsJsonSchemaConverter.convertPluginConfigsIntoJsonSchemas(
+ SchemaVersion.DRAFT_2020_12, OptionPreset.PLAIN_JSON, pluginType);
+ Map filteredPluginNameToJsonSchemaMap;
+ if (pluginNames == null) {
+ filteredPluginNameToJsonSchemaMap = pluginNameToJsonSchemaMap;
+ } else {
+ final Set pluginNamesSet = Set.of(pluginNames.split(","));
+ filteredPluginNameToJsonSchemaMap = pluginNamesSet.stream()
+ .filter(name -> {
+ if (!pluginNameToJsonSchemaMap.containsKey(name)) {
+ LOG.error("plugin name: {} not found", name);
+ return false;
+ }
+ return true;
+ })
+ .collect(Collectors.toMap(
+ Function.identity(),
+ pluginNameToJsonSchemaMap::get
+ ));
+ }
+
+ if (folderPath == null) {
+ writeCollectionToConsole(filteredPluginNameToJsonSchemaMap.values());
+ } else {
+ writeMapToFiles(filteredPluginNameToJsonSchemaMap, folderPath);
+ }
+ }
+
+ private static void writeCollectionToConsole(final Collection values) {
+ values.forEach(System.out::println);
+ }
+
+ private static void writeMapToFiles(final Map map, final String folderPath) {
+ // Ensure the directory exists
+ final Path directory = Paths.get(folderPath);
+ if (!Files.exists(directory)) {
+ try {
+ Files.createDirectories(directory);
+ } catch (IOException e) {
+ System.err.println("Error creating directory: " + e.getMessage());
+ return;
+ }
+ }
+
+ // Iterate through the map and write each entry to a file
+ for (final Map.Entry entry : map.entrySet()) {
+ final String fileName = entry.getKey() + ".json";
+ final Path filePath = directory.resolve(fileName);
+
+ try {
+ Files.write(filePath, entry.getValue().getBytes());
+ System.out.println("Written file: " + filePath);
+ } catch (IOException e) {
+ System.err.println("Error writing file " + fileName + ": " + e.getMessage());
+ }
+ }
+ }
+}
diff --git a/data-prepper-plugin-schema-cli/src/main/java/org/opensearch/dataprepper/schemas/JsonSchemaConverter.java b/data-prepper-plugin-schema-cli/src/main/java/org/opensearch/dataprepper/schemas/JsonSchemaConverter.java
new file mode 100644
index 0000000000..fe08825af4
--- /dev/null
+++ b/data-prepper-plugin-schema-cli/src/main/java/org/opensearch/dataprepper/schemas/JsonSchemaConverter.java
@@ -0,0 +1,52 @@
+package org.opensearch.dataprepper.schemas;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import com.github.victools.jsonschema.generator.FieldScope;
+import com.github.victools.jsonschema.generator.Module;
+import com.github.victools.jsonschema.generator.OptionPreset;
+import com.github.victools.jsonschema.generator.SchemaGenerator;
+import com.github.victools.jsonschema.generator.SchemaGeneratorConfig;
+import com.github.victools.jsonschema.generator.SchemaGeneratorConfigBuilder;
+import com.github.victools.jsonschema.generator.SchemaGeneratorConfigPart;
+import com.github.victools.jsonschema.generator.SchemaVersion;
+
+import java.util.List;
+
+public class JsonSchemaConverter {
+ static final String DEPRECATED_SINCE_KEY = "deprecated";
+ private final List jsonSchemaGeneratorModules;
+
+ public JsonSchemaConverter(final List jsonSchemaGeneratorModules) {
+ this.jsonSchemaGeneratorModules = jsonSchemaGeneratorModules;
+ }
+
+ public ObjectNode convertIntoJsonSchema(
+ final SchemaVersion schemaVersion, final OptionPreset optionPreset, final Class> clazz)
+ throws JsonProcessingException {
+ final SchemaGeneratorConfigBuilder configBuilder = new SchemaGeneratorConfigBuilder(
+ schemaVersion, optionPreset);
+ loadJsonSchemaGeneratorModules(configBuilder);
+ final SchemaGeneratorConfigPart scopeSchemaGeneratorConfigPart = configBuilder.forFields();
+ overrideInstanceAttributeWithDeprecated(scopeSchemaGeneratorConfigPart);
+
+ final SchemaGeneratorConfig config = configBuilder.build();
+ final SchemaGenerator generator = new SchemaGenerator(config);
+ return generator.generateSchema(clazz);
+ }
+
+ private void loadJsonSchemaGeneratorModules(final SchemaGeneratorConfigBuilder configBuilder) {
+ jsonSchemaGeneratorModules.forEach(configBuilder::with);
+ }
+
+ private void overrideInstanceAttributeWithDeprecated(
+ final SchemaGeneratorConfigPart scopeSchemaGeneratorConfigPart) {
+ scopeSchemaGeneratorConfigPart.withInstanceAttributeOverride((node, field, context) -> {
+ final Deprecated deprecatedAnnotation = field.getAnnotationConsideringFieldAndGetter(
+ Deprecated.class);
+ if (deprecatedAnnotation != null) {
+ node.put(DEPRECATED_SINCE_KEY, deprecatedAnnotation.since());
+ }
+ });
+ }
+}
diff --git a/data-prepper-plugin-schema-cli/src/main/java/org/opensearch/dataprepper/schemas/PluginConfigsJsonSchemaConverter.java b/data-prepper-plugin-schema-cli/src/main/java/org/opensearch/dataprepper/schemas/PluginConfigsJsonSchemaConverter.java
new file mode 100644
index 0000000000..b7f4c1a531
--- /dev/null
+++ b/data-prepper-plugin-schema-cli/src/main/java/org/opensearch/dataprepper/schemas/PluginConfigsJsonSchemaConverter.java
@@ -0,0 +1,135 @@
+package org.opensearch.dataprepper.schemas;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import com.github.victools.jsonschema.generator.OptionPreset;
+import com.github.victools.jsonschema.generator.SchemaVersion;
+import org.opensearch.dataprepper.model.annotations.DataPrepperPlugin;
+import org.opensearch.dataprepper.model.buffer.Buffer;
+import org.opensearch.dataprepper.model.configuration.ConditionalRoute;
+import org.opensearch.dataprepper.model.processor.Processor;
+import org.opensearch.dataprepper.model.sink.Sink;
+import org.opensearch.dataprepper.model.source.Source;
+import org.reflections.Reflections;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import static org.opensearch.dataprepper.model.configuration.PipelineModel.BUFFER_PLUGIN_TYPE;
+import static org.opensearch.dataprepper.model.configuration.PipelineModel.PROCESSOR_PLUGIN_TYPE;
+import static org.opensearch.dataprepper.model.configuration.PipelineModel.ROUTE_PLUGIN_TYPE;
+import static org.opensearch.dataprepper.model.configuration.PipelineModel.SINK_PLUGIN_TYPE;
+import static org.opensearch.dataprepper.model.configuration.PipelineModel.SOURCE_PLUGIN_TYPE;
+
+public class PluginConfigsJsonSchemaConverter {
+ private static final Logger LOG = LoggerFactory.getLogger(PluginConfigsJsonSchemaConverter.class);
+ static final String SITE_URL_PLACEHOLDER = "{{site.url}}";
+ static final String SITE_BASE_URL_PLACEHOLDER = "{{site.baseurl}}";
+ static final String DOCUMENTATION_LINK_KEY = "documentation";
+ static final String PLUGIN_NAME_KEY = "name";
+ static final String PLUGIN_DOCUMENTATION_URL_FORMAT =
+ "%s%s/data-prepper/pipelines/configuration/%s/%s/";
+ static final Map, String> PLUGIN_TYPE_TO_URI_PARAMETER_MAP = Map.of(
+ Source.class, "sources",
+ Processor.class, "processors",
+ ConditionalRoute.class, "processors",
+ Buffer.class, "buffers",
+ Sink.class, "sinks"
+ );
+ static final String CONDITIONAL_ROUTE_PROCESSOR_NAME = "routes";
+ static final Map> PLUGIN_TYPE_NAME_TO_CLASS_MAP = Map.of(
+ SOURCE_PLUGIN_TYPE, Source.class,
+ PROCESSOR_PLUGIN_TYPE, Processor.class,
+ ROUTE_PLUGIN_TYPE, ConditionalRoute.class,
+ BUFFER_PLUGIN_TYPE, Buffer.class,
+ SINK_PLUGIN_TYPE, Sink.class);
+
+ private final String siteUrl;
+ private final String siteBaseUrl;
+ private final Reflections reflections;
+ private final JsonSchemaConverter jsonSchemaConverter;
+
+ public PluginConfigsJsonSchemaConverter(
+ final Reflections reflections,
+ final JsonSchemaConverter jsonSchemaConverter,
+ final String siteUrl,
+ final String siteBaseUrl) {
+ this.reflections = reflections;
+ this.jsonSchemaConverter = jsonSchemaConverter;
+ this.siteUrl = siteUrl == null ? SITE_URL_PLACEHOLDER : siteUrl;
+ this.siteBaseUrl = siteBaseUrl == null ? SITE_BASE_URL_PLACEHOLDER : siteBaseUrl;
+ }
+
+ public Set validPluginTypeNames() {
+ return PLUGIN_TYPE_NAME_TO_CLASS_MAP.keySet();
+ }
+
+ public Class> pluginTypeNameToPluginType(final String pluginTypeName) {
+ final Class> pluginType = PLUGIN_TYPE_NAME_TO_CLASS_MAP.get(pluginTypeName);
+ if (pluginType == null) {
+ throw new IllegalArgumentException(String.format("Invalid plugin type name: %s.", pluginTypeName));
+ }
+ return pluginType;
+ }
+
+ public Map convertPluginConfigsIntoJsonSchemas(
+ final SchemaVersion schemaVersion, final OptionPreset optionPreset, final Class> pluginType) {
+ final Map> nameToConfigClass = scanForPluginConfigs(pluginType);
+ return nameToConfigClass.entrySet().stream()
+ .flatMap(entry -> {
+ final String pluginName = entry.getKey();
+ String value;
+ try {
+ final ObjectNode jsonSchemaNode = jsonSchemaConverter.convertIntoJsonSchema(
+ schemaVersion, optionPreset, entry.getValue());
+ addPluginName(jsonSchemaNode, pluginName);
+ addDocumentationLink(jsonSchemaNode, pluginName, pluginType);
+ value = jsonSchemaNode.toPrettyString();
+ } catch (JsonProcessingException e) {
+ LOG.error("Encountered error retrieving JSON schema for {}", pluginName);
+ return Stream.empty();
+ }
+ return Stream.of(Map.entry(entry.getKey(), value));
+ })
+ .filter(entry -> Objects.nonNull(entry.getValue()))
+ .collect(Collectors.toMap(
+ Map.Entry::getKey,
+ Map.Entry::getValue
+ ));
+ }
+
+ private Map> scanForPluginConfigs(final Class> pluginType) {
+ if (ConditionalRoute.class.equals(pluginType)) {
+ return Map.of(CONDITIONAL_ROUTE_PROCESSOR_NAME, ConditionalRoute.class);
+ }
+ return reflections.getTypesAnnotatedWith(DataPrepperPlugin.class).stream()
+ .map(clazz -> clazz.getAnnotation(DataPrepperPlugin.class))
+ .filter(dataPrepperPlugin -> pluginType.equals(dataPrepperPlugin.pluginType()))
+ .collect(Collectors.toMap(
+ DataPrepperPlugin::name,
+ DataPrepperPlugin::pluginConfigurationType
+ ));
+ }
+
+ private void addDocumentationLink(final ObjectNode jsonSchemaNode,
+ final String pluginName,
+ final Class> pluginType) {
+ jsonSchemaNode.put(DOCUMENTATION_LINK_KEY,
+ String.format(
+ PLUGIN_DOCUMENTATION_URL_FORMAT,
+ siteUrl,
+ siteBaseUrl,
+ PLUGIN_TYPE_TO_URI_PARAMETER_MAP.get(pluginType),
+ pluginName));
+ }
+
+ private void addPluginName(final ObjectNode jsonSchemaNode,
+ final String pluginName) {
+ jsonSchemaNode.put(PLUGIN_NAME_KEY, pluginName);
+ }
+}
diff --git a/data-prepper-plugin-schema-cli/src/main/java/org/opensearch/dataprepper/schemas/module/CustomJacksonModule.java b/data-prepper-plugin-schema-cli/src/main/java/org/opensearch/dataprepper/schemas/module/CustomJacksonModule.java
new file mode 100644
index 0000000000..09c649cc4c
--- /dev/null
+++ b/data-prepper-plugin-schema-cli/src/main/java/org/opensearch/dataprepper/schemas/module/CustomJacksonModule.java
@@ -0,0 +1,31 @@
+package org.opensearch.dataprepper.schemas.module;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.databind.PropertyNamingStrategies;
+import com.github.victools.jsonschema.generator.MemberScope;
+import com.github.victools.jsonschema.module.jackson.JacksonModule;
+import com.github.victools.jsonschema.module.jackson.JacksonOption;
+
+public class CustomJacksonModule extends JacksonModule {
+
+ public CustomJacksonModule() {
+ super();
+ }
+
+ public CustomJacksonModule(JacksonOption... options) {
+ super(options);
+ }
+
+ @Override
+ protected String getPropertyNameOverrideBasedOnJsonPropertyAnnotation(MemberScope, ?> member) {
+ JsonProperty annotation = member.getAnnotationConsideringFieldAndGetter(JsonProperty.class);
+ if (annotation != null) {
+ String nameOverride = annotation.value();
+ // check for invalid overrides
+ if (nameOverride != null && !nameOverride.isEmpty() && !nameOverride.equals(member.getDeclaredName())) {
+ return nameOverride;
+ }
+ }
+ return PropertyNamingStrategies.SNAKE_CASE.nameForField(null, null, member.getName());
+ }
+}
diff --git a/data-prepper-plugin-schema-cli/src/test/java/org/opensearch/dataprepper/schemas/JsonSchemaConverterTest.java b/data-prepper-plugin-schema-cli/src/test/java/org/opensearch/dataprepper/schemas/JsonSchemaConverterTest.java
new file mode 100644
index 0000000000..d5d172f8c0
--- /dev/null
+++ b/data-prepper-plugin-schema-cli/src/test/java/org/opensearch/dataprepper/schemas/JsonSchemaConverterTest.java
@@ -0,0 +1,60 @@
+package org.opensearch.dataprepper.schemas;
+
+import com.fasterxml.jackson.annotation.JsonClassDescription;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import com.github.victools.jsonschema.generator.Module;
+import com.github.victools.jsonschema.generator.OptionPreset;
+import com.github.victools.jsonschema.generator.SchemaVersion;
+import org.junit.jupiter.api.Test;
+import org.opensearch.dataprepper.schemas.module.CustomJacksonModule;
+
+import java.util.Collections;
+import java.util.List;
+
+import static org.hamcrest.CoreMatchers.instanceOf;
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.MatcherAssert.assertThat;
+
+class JsonSchemaConverterTest {
+
+ public JsonSchemaConverter createObjectUnderTest(final List modules) {
+ return new JsonSchemaConverter(modules);
+ }
+
+ @Test
+ void testConvertIntoJsonSchemaWithDefaultModules() throws JsonProcessingException {
+ final JsonSchemaConverter jsonSchemaConverter = createObjectUnderTest(Collections.emptyList());
+ final ObjectNode jsonSchemaNode = jsonSchemaConverter.convertIntoJsonSchema(
+ SchemaVersion.DRAFT_2020_12, OptionPreset.PLAIN_JSON, TestConfig.class);
+ assertThat(jsonSchemaNode, instanceOf(ObjectNode.class));
+ }
+
+ @Test
+ void testConvertIntoJsonSchemaWithCustomJacksonModule() throws JsonProcessingException {
+ final JsonSchemaConverter jsonSchemaConverter = createObjectUnderTest(
+ Collections.singletonList(new CustomJacksonModule()));
+ final ObjectNode jsonSchemaNode = jsonSchemaConverter.convertIntoJsonSchema(
+ SchemaVersion.DRAFT_2020_12, OptionPreset.PLAIN_JSON, TestConfig.class);
+ assertThat(jsonSchemaNode, instanceOf(ObjectNode.class));
+ assertThat(jsonSchemaNode.has("description"), is(true));
+ final JsonNode propertiesNode = jsonSchemaNode.at("/properties");
+ assertThat(propertiesNode, instanceOf(ObjectNode.class));
+ assertThat(propertiesNode.has("test_attribute_with_getter"), is(true));
+ assertThat(propertiesNode.has("custom_test_attribute"), is(true));
+ }
+
+ @JsonClassDescription("test config")
+ static class TestConfig {
+ private String testAttributeWithGetter;
+
+ @JsonProperty("custom_test_attribute")
+ private String testAttributeWithJsonPropertyAnnotation;
+
+ public String getTestAttributeWithGetter() {
+ return testAttributeWithGetter;
+ }
+ }
+}
\ No newline at end of file
diff --git a/data-prepper-plugin-schema-cli/src/test/java/org/opensearch/dataprepper/schemas/PluginConfigsJsonSchemaConverterIT.java b/data-prepper-plugin-schema-cli/src/test/java/org/opensearch/dataprepper/schemas/PluginConfigsJsonSchemaConverterIT.java
new file mode 100644
index 0000000000..71e9bf5faa
--- /dev/null
+++ b/data-prepper-plugin-schema-cli/src/test/java/org/opensearch/dataprepper/schemas/PluginConfigsJsonSchemaConverterIT.java
@@ -0,0 +1,80 @@
+package org.opensearch.dataprepper.schemas;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.github.victools.jsonschema.generator.Module;
+import com.github.victools.jsonschema.generator.OptionPreset;
+import com.github.victools.jsonschema.generator.SchemaVersion;
+import com.github.victools.jsonschema.module.jackson.JacksonModule;
+import com.github.victools.jsonschema.module.jakarta.validation.JakartaValidationModule;
+import com.github.victools.jsonschema.module.jakarta.validation.JakartaValidationOption;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
+import org.reflections.Reflections;
+import org.reflections.scanners.Scanners;
+import org.reflections.util.ClasspathHelper;
+import org.reflections.util.ConfigurationBuilder;
+
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.stream.Stream;
+
+import static com.github.victools.jsonschema.module.jackson.JacksonOption.RESPECT_JSONPROPERTY_REQUIRED;
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.CoreMatchers.notNullValue;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.opensearch.dataprepper.schemas.PluginConfigsJsonSchemaConverter.DOCUMENTATION_LINK_KEY;
+import static org.opensearch.dataprepper.schemas.PluginConfigsJsonSchemaConverter.PLUGIN_NAME_KEY;
+
+class PluginConfigsJsonSchemaConverterIT {
+ static final String DEFAULT_PLUGINS_CLASSPATH = "org.opensearch.dataprepper.plugins";
+ private static final String TEST_URL = String.format("https://%s/", UUID.randomUUID());
+ private static final String TEST_BASE_URL = String.format("/%s", UUID.randomUUID());
+ private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+ static final TypeReference