diff --git a/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/event/JacksonEvent.java b/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/event/JacksonEvent.java index 047e4c4a07..8b4b2a729f 100644 --- a/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/event/JacksonEvent.java +++ b/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/event/JacksonEvent.java @@ -373,19 +373,9 @@ private String trimKey(final String key) { } private boolean isValidKey(final String key) { - char previous = ' '; - char next = ' '; for (int i = 0; i < key.length(); i++) { char c = key.charAt(i); - if (i < key.length() - 1) { - next = key.charAt(i + 1); - } - - if ((i == 0 || i == key.length() - 1 || previous == '/' || next == '/') && (c == '_' || c == '.' || c == '-')) { - return false; - } - if (!(c >= 48 && c <= 57 || c >= 65 && c <= 90 || c >= 97 && c <= 122 @@ -397,7 +387,6 @@ private boolean isValidKey(final String key) { return false; } - previous = c; } return true; } diff --git a/data-prepper-api/src/test/java/org/opensearch/dataprepper/model/event/JacksonEventTest.java b/data-prepper-api/src/test/java/org/opensearch/dataprepper/model/event/JacksonEventTest.java index 9de73495f9..92b181ac8c 100644 --- a/data-prepper-api/src/test/java/org/opensearch/dataprepper/model/event/JacksonEventTest.java +++ b/data-prepper-api/src/test/java/org/opensearch/dataprepper/model/event/JacksonEventTest.java @@ -323,9 +323,8 @@ public void testIsValueAList_withNull() { } @ParameterizedTest - @ValueSource(strings = {"", "withSpecialChars*$%", "-withPrefixDash", "\\-withEscapeChars", "\\\\/withMultipleEscapeChars", - "withDashSuffix-", "withDashSuffix-/nestedKey", "withDashPrefix/-nestedKey", "_withUnderscorePrefix", "withUnderscoreSuffix_", - ".withDotPrefix", "withDotSuffix.", "with,Comma", "with:Colon", "with[Bracket", "with|Brace"}) + @ValueSource(strings = {"", "withSpecialChars*$%", "\\-withEscapeChars", "\\\\/withMultipleEscapeChars", + "with,Comma", "with:Colon", "with[Bracket", "with|Brace"}) void testKey_withInvalidKey_throwsIllegalArgumentException(final String invalidKey) { assertThrowsForKeyCheck(IllegalArgumentException.class, invalidKey); } diff --git a/data-prepper-plugins/opensearch-source/README.md b/data-prepper-plugins/opensearch-source/README.md new file mode 100644 index 0000000000..a86c2e77ec --- /dev/null +++ b/data-prepper-plugins/opensearch-source/README.md @@ -0,0 +1,69 @@ +# OpenSearch Source + +This is the Date Prepper OpenSearch source plugin that processes indices either OpenSearch, Elasticsearch, +or Amazon OpenSearch Service. It is meant for migrating index data from a cluster. + +Note: Only fully tested versions with be listed below. It is likely many more versions are supported already, but it is untested. + +The OpenSearch source is compatible with the following OpenSearch versions: +* 2.5 + +And is compatible with the following Elasticsearch versions: +* 7.10 + +# Usages + +### Amazon OpenSearch Service + +The OpenSearch sink can also be configured for an Amazon OpenSearch Service domain. See [security](security.md) for details. + +```yaml +opensearch-source-pipeline: + source: + opensearch: + connection: + insecure: true + hosts: [ "https://search-my-domain-soopywaovobopgs8ywurr3utsu.us-east-1.es.amazonaws.com" ] + aws: + region: "us-east-1" + sts_role_arn: "arn:aws:iam::123456789012:role/my-domain-role" +``` + +## Configuration + +- `hosts`: A list of IP addresses of OpenSearch or Elasticsearch nodes. + + +- `username`: + + +- `password`: + + +- `aws` (Optional) : AWS configurations. See [AWS Configuration](#aws_configuration) for details. SigV4 is enabled by default when this option is used. + + +- `search_options` (Optional) : See [Search Configuration](#search_configuration) for details + + +- `indices` (Optional): See [Indices Configurations](#indices_configuration) for filtering options. + + +- `scheduling` (Optional): See [Scheduling Configuration](#scheduling_configuration) for details + + +- `connection` (Optional): See [] + +### AWS Configuration + +* `region` (Optional) : The AWS region to use for credentials. Defaults to [standard SDK behavior to determine the region](https://docs.aws.amazon.com/sdk-for-java/latest/developer-guide/region-selection.html). +* `sts_role_arn` (Optional) : The STS role to assume for requests to AWS. Defaults to null, which will use the [standard SDK behavior for credentials](https://docs.aws.amazon.com/sdk-for-java/latest/developer-guide/credentials.html). +* `sts_header_overrides` (Optional): A map of header overrides to make when assuming the IAM role for the source plugin. + +### Search Configuration + +### Scheduling Configuration + +### Connection Configuration + +### Indices Configuration \ No newline at end of file diff --git a/data-prepper-plugins/opensearch-source/security.md b/data-prepper-plugins/opensearch-source/security.md new file mode 100644 index 0000000000..e69de29bb2 diff --git a/data-prepper-plugins/parse-json-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/parsejson/ParseJsonProcessor.java b/data-prepper-plugins/parse-json-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/parsejson/ParseJsonProcessor.java index 99f04d3e33..e076b7de10 100644 --- a/data-prepper-plugins/parse-json-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/parsejson/ParseJsonProcessor.java +++ b/data-prepper-plugins/parse-json-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/parsejson/ParseJsonProcessor.java @@ -64,34 +64,37 @@ public Collection> doExecute(final Collection> recor final boolean doUsePointer = Objects.nonNull(pointer); for (final Record record : records) { - final Event event = record.getData(); - if (Objects.nonNull(parseWhen) && !expressionEvaluator.evaluateConditional(parseWhen, event)) { - continue; - } - - final String message = event.get(source, String.class); - if (Objects.isNull(message)) { - continue; - } - - try { - final TypeReference> hashMapTypeReference = new TypeReference>() {}; - Map parsedJson = objectMapper.readValue(message, hashMapTypeReference); - - if (doUsePointer) { - parsedJson = parseUsingPointer(event, parsedJson, pointer, doWriteToRoot); - } - - if (doWriteToRoot) { - writeToRoot(event, parsedJson); - } else { - event.put(destination, parsedJson); + final Event event = record.getData(); + try { + if (Objects.nonNull(parseWhen) && !expressionEvaluator.evaluateConditional(parseWhen, event)) { + continue; + } + + final String message = event.get(source, String.class); + if (Objects.isNull(message)) { + continue; + } + final TypeReference> hashMapTypeReference = new TypeReference>() { + }; + Map parsedJson = objectMapper.readValue(message, hashMapTypeReference); + + if (doUsePointer) { + parsedJson = parseUsingPointer(event, parsedJson, pointer, doWriteToRoot); + } + + if (doWriteToRoot) { + writeToRoot(event, parsedJson); + } else { + event.put(destination, parsedJson); + } + } catch (final JsonProcessingException jsonException) { + event.getMetadata().addTags(tagsOnFailure); + LOG.error(EVENT, "An exception occurred due to invalid JSON while reading event [{}]", event, jsonException); + } catch (final Exception e) { + event.getMetadata().addTags(tagsOnFailure); + LOG.error(EVENT, "An exception occurred while using the parse_json processor on Event [{}]", event, e); } - } catch (final JsonProcessingException jsonException) { - event.getMetadata().addTags(tagsOnFailure); - LOG.error(EVENT, "An exception occurred due to invalid JSON while reading event [{}]", event, jsonException); - } } return records; }