forked from opensearch-project/data-prepper
-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Add Dissect Processor (opensearch-project#3363)
* Added Dissect Processor Functionality Signed-off-by: Vishal Boinapalli <[email protected]> * Fixed checkstyle issue Signed-off-by: Vishal Boinapalli <[email protected]> * Tweak readme and a unit test Signed-off-by: Hai Yan <[email protected]> * Fix build failures Signed-off-by: Hai Yan <[email protected]> * Address review comments - separate unit tests for dissector from processor; add delimiter and fieldhelper tests Signed-off-by: Hai Yan <[email protected]> --------- Signed-off-by: Vishal Boinapalli <[email protected]> Signed-off-by: Hai Yan <[email protected]> Co-authored-by: Vishal Boinapalli <[email protected]>
- Loading branch information
1 parent
f2593a9
commit fee636c
Showing
18 changed files
with
1,421 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,129 @@ | ||
# Dissect Processor | ||
|
||
The Dissect processor is useful when dealing with log files or messages that have a known pattern or structure. It extracts specific pieces of information from the text and map them to individual fields based on the defined Dissect patterns. | ||
|
||
|
||
## Basic Usage | ||
|
||
To get started with dissect processor using Data Prepper, create the following `pipeline.yaml`. | ||
```yaml | ||
dissect-pipeline: | ||
source: | ||
file: | ||
path: "/full/path/to/dissect_logs_json.log" | ||
record_type: "event" | ||
format: "json" | ||
processor: | ||
- dissect: | ||
map: | ||
log: "%{Date} %{Time} %{Log_Type}: %{Message}" | ||
sink: | ||
- stdout: | ||
``` | ||
Create the following file named `dissect_logs_json.log` and replace the `path` in the file source of your `pipeline.yaml` with the path of this file. | ||
|
||
``` | ||
{"log": "07-25-2023 10:00:00 ERROR: Some error"} | ||
``` | ||
The Dissect processor will retrieve the necessary fields from the `log` message, such as `Date`, `Time`, `Log_Type`, and `Message`, with the help of the pattern `%{Date} %{Time} %{Type}: %{Message}`, configured in the pipeline. | ||
When you run Data Prepper with this `pipeline.yaml` passed in, you should see the following standard output. | ||
``` | ||
{ | ||
"log" : "07-25-2023 10:00:00 ERROR: Some error", | ||
"Date" : "07-25-2023" | ||
"Time" : "10:00:00" | ||
"Log_Type" : "ERROR" | ||
"Message" : "Some error" | ||
} | ||
``` | ||
The fields `Date`, `Time`, `Log_Type`, and `Message` have been extracted from `log` value. | ||
## Configuration | ||
* `map` (Required): `map` is required to specify the dissect patterns. It takes a `Map<String, String>` with fields as keys and respective dissect patterns as values. | ||
* `target_types` (Optional): A `Map<String, String>` that specifies what the target type of specific field should be. Valid options are `integer`, `double`, `string`, and `boolean`. By default, all the values are `string`. Target types will be changed after the dissection process. | ||
* `dissect_when` (Optional): A Data Prepper Expression string following the [Data Prepper Expression syntax](../../docs/expression_syntax.md). When configured, the processor will evaluate the expression before proceeding with the dissection process and perform the dissection if the expression evaluates to `true`. | ||
## Field Notations | ||
Symbols like `?, +, ->, /, &` can be used to perform logical extraction of data. | ||
* **Normal Field** : The field without a suffix or prefix. The field will be directly added to the output Event. | ||
Ex: `%{field_name}` | ||
* **Skip Field** : ? can be used as a prefix to key to skip that field in the output JSON. | ||
* Skip Field : `%{}` | ||
* Named skip field : `%{?field_name}` | ||
* **Append Field** : To append multiple values and put the final value in the field, we can use + before the field name in the dissect pattern | ||
* **Usage**: | ||
Pattern : "%{+field_name}, %{+field_name}" | ||
Text : "foo, bar" | ||
Output : {"field_name" : "foobar"} | ||
We can also define the order the concatenation with the help of suffix `/<digits>` . | ||
* **Usage**: | ||
Pattern : "%{+field_name/2}, %{+field_name/1}" | ||
Text : "foo, bar" | ||
Output : {"field_name" : "barfoo"} | ||
If the order is not mentioned, the append operation will take place in the order of fields specified in the dissect pattern.<br><br> | ||
* **Indirect Field** : While defining a pattern, prefix the field with a `&` to assign the value found with this field to the value of another field found as the key. | ||
* **Usage**: | ||
Pattern : "%{?field_name}, %{&field_name}" | ||
Text: "foo, bar" | ||
Output : {“foo” : “bar”} | ||
Here we can see that `foo` which was captured from the skip field `%{?field_name}` is made the key to value captured form the field `%{&field_name}` | ||
* **Usage**: | ||
Pattern : %{field_name}, %{&field_name} | ||
Text: "foo, bar" | ||
Output : {“field_name”:“foo”, “foo”:“bar”} | ||
We can also indirectly assign the value to an appended field, along with `normal` field and `skip` field. | ||
### Padding | ||
* `->` operator can be used as a suffix to a field to indicate that white spaces after this field can be ignored. | ||
* **Usage**: | ||
Pattern : %{field1→} %{field2} | ||
Text : “firstname lastname” | ||
Output : {“field1” : “firstname”, “field2” : “lastname”} | ||
* This operator should be used as the right most suffix. | ||
* **Usage**: | ||
Pattern : %{fieldname/1->} %{fieldname/2} | ||
If we use `->` before `/<digit>`, the `->` operator will also be considered part of the field name. | ||
## Developer Guide | ||
This plugin is compatible with Java 14. See | ||
- [CONTRIBUTING](https://github.com/opensearch-project/data-prepper/blob/main/CONTRIBUTING.md) | ||
- [monitoring](https://github.com/opensearch-project/data-prepper/blob/main/docs/monitoring.md) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,24 @@ | ||
/* | ||
* Copyright OpenSearch Contributors | ||
* SPDX-License-Identifier: Apache-2.0 | ||
*/ | ||
|
||
plugins { | ||
id 'java' | ||
} | ||
|
||
|
||
dependencies { | ||
implementation project(':data-prepper-api') | ||
implementation 'com.fasterxml.jackson.dataformat:jackson-dataformat-yaml:2.15.0' | ||
implementation 'io.micrometer:micrometer-core' | ||
implementation project(path: ':data-prepper-api') | ||
implementation project(path: ':data-prepper-plugins:mutate-event-processors') | ||
testImplementation project(':data-prepper-plugins:log-generator-source') | ||
testImplementation project(':data-prepper-test-common') | ||
implementation 'org.apache.commons:commons-lang3:3.12.0' | ||
} | ||
|
||
test { | ||
useJUnitPlatform() | ||
} |
56 changes: 56 additions & 0 deletions
56
...ocessor/src/main/java/org/opensearch/dataprepper/plugins/processor/dissect/Delimiter.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,56 @@ | ||
/* | ||
* Copyright OpenSearch Contributors | ||
* SPDX-License-Identifier: Apache-2.0 | ||
*/ | ||
|
||
package org.opensearch.dataprepper.plugins.processor.dissect; | ||
|
||
public class Delimiter { | ||
private final String delimiterString; | ||
private int start = -1; | ||
private int end = -1; | ||
private Delimiter next = null; | ||
|
||
private Delimiter prev = null; | ||
|
||
public Delimiter(String delimiterString) { | ||
this.delimiterString = delimiterString; | ||
} | ||
|
||
public int getStart() { | ||
return start; | ||
} | ||
|
||
public void setStart(int ind) { | ||
start = ind; | ||
} | ||
|
||
public int getEnd() { | ||
return end; | ||
} | ||
|
||
public void setEnd(int ind) { | ||
end = ind; | ||
} | ||
|
||
public Delimiter getNext() { | ||
return next; | ||
} | ||
|
||
public void setNext(Delimiter nextDelimiter) { | ||
next = nextDelimiter; | ||
} | ||
|
||
public Delimiter getPrev() { | ||
return prev; | ||
} | ||
|
||
public void setPrev(Delimiter prevDelimiter) { | ||
prev = prevDelimiter; | ||
} | ||
|
||
@Override | ||
public String toString() { | ||
return delimiterString; | ||
} | ||
} |
122 changes: 122 additions & 0 deletions
122
.../src/main/java/org/opensearch/dataprepper/plugins/processor/dissect/DissectProcessor.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,122 @@ | ||
/* | ||
* Copyright OpenSearch Contributors | ||
* SPDX-License-Identifier: Apache-2.0 | ||
*/ | ||
|
||
package org.opensearch.dataprepper.plugins.processor.dissect; | ||
|
||
import org.opensearch.dataprepper.expression.ExpressionEvaluator; | ||
import org.opensearch.dataprepper.metrics.PluginMetrics; | ||
import org.opensearch.dataprepper.model.annotations.DataPrepperPlugin; | ||
import org.opensearch.dataprepper.model.annotations.DataPrepperPluginConstructor; | ||
import org.opensearch.dataprepper.model.event.Event; | ||
import org.opensearch.dataprepper.model.processor.AbstractProcessor; | ||
import org.opensearch.dataprepper.model.processor.Processor; | ||
import org.opensearch.dataprepper.model.record.Record; | ||
import org.opensearch.dataprepper.plugins.processor.dissect.Fields.Field; | ||
import org.opensearch.dataprepper.plugins.processor.mutateevent.TargetType; | ||
import org.opensearch.dataprepper.typeconverter.TypeConverter; | ||
import org.slf4j.Logger; | ||
import org.slf4j.LoggerFactory; | ||
|
||
import java.util.Collection; | ||
import java.util.HashMap; | ||
import java.util.List; | ||
import java.util.Map; | ||
import java.util.Objects; | ||
|
||
import static org.opensearch.dataprepper.logging.DataPrepperMarkers.EVENT; | ||
|
||
|
||
@DataPrepperPlugin(name = "dissect", pluginType = Processor.class, pluginConfigurationType = DissectProcessorConfig.class) | ||
public class DissectProcessor extends AbstractProcessor<Record<Event>, Record<Event>> { | ||
private static final Logger LOG = LoggerFactory.getLogger(DissectProcessor.class); | ||
private final DissectProcessorConfig dissectConfig; | ||
private final Map<String, Dissector> dissectorMap = new HashMap<>(); | ||
private final Map<String, TargetType> targetTypeMap; | ||
private final ExpressionEvaluator expressionEvaluator; | ||
|
||
@DataPrepperPluginConstructor | ||
public DissectProcessor(PluginMetrics pluginMetrics, final DissectProcessorConfig dissectConfig, final ExpressionEvaluator expressionEvaluator) { | ||
super(pluginMetrics); | ||
this.dissectConfig = dissectConfig; | ||
this.expressionEvaluator = expressionEvaluator; | ||
this.targetTypeMap = dissectConfig.getTargetTypes(); | ||
|
||
Map<String, String> patternsMap = dissectConfig.getMap(); | ||
for (String key : patternsMap.keySet()) { | ||
Dissector dissector = new Dissector(patternsMap.get(key)); | ||
dissectorMap.put(key, dissector); | ||
} | ||
|
||
} | ||
|
||
@Override | ||
public Collection<Record<Event>> doExecute(Collection<Record<Event>> records) { | ||
for (final Record<Event> record : records) { | ||
Event event = record.getData(); | ||
String dissectWhen = dissectConfig.getDissectWhen(); | ||
if (Objects.nonNull(dissectWhen) && !expressionEvaluator.evaluateConditional(dissectWhen, event)) { | ||
continue; | ||
} | ||
try{ | ||
for(String field: dissectorMap.keySet()){ | ||
if(event.containsKey(field)){ | ||
dissectField(event, field); | ||
} | ||
} | ||
} | ||
catch (Exception ex){ | ||
LOG.error(EVENT, "Error dissecting the event [{}] ", record.getData(), ex); | ||
} | ||
} | ||
return records; | ||
} | ||
|
||
private void dissectField(Event event, String field){ | ||
Dissector dissector = dissectorMap.get(field); | ||
String text = event.get(field, String.class); | ||
if (dissector.dissectText(text)) { | ||
List<Field> dissectedFields = dissector.getDissectedFields(); | ||
for(Field disectedField: dissectedFields) { | ||
String dissectFieldName = disectedField.getKey(); | ||
Object dissectFieldValue = convertTargetType(dissectFieldName,disectedField.getValue()); | ||
event.put(disectedField.getKey(), dissectFieldValue); | ||
} | ||
} | ||
} | ||
|
||
private Object convertTargetType(String fieldKey, String fieldValue){ | ||
if(targetTypeMap == null){ | ||
return fieldValue; | ||
} | ||
try{ | ||
if(targetTypeMap.containsKey(fieldKey)){ | ||
TypeConverter converter = targetTypeMap.get(fieldKey).getTargetConverter(); | ||
return converter.convert(fieldValue); | ||
} else { | ||
return fieldValue; | ||
} | ||
} catch (NumberFormatException ex){ | ||
LOG.error("Unable to convert [{}] to the target type mentioned", fieldKey); | ||
return fieldValue; | ||
} | ||
} | ||
|
||
|
||
|
||
@Override | ||
public void prepareForShutdown() { | ||
|
||
} | ||
|
||
@Override | ||
public boolean isReadyForShutdown() { | ||
return true; | ||
} | ||
|
||
@Override | ||
public void shutdown() { | ||
|
||
} | ||
} |
28 changes: 28 additions & 0 deletions
28
...ain/java/org/opensearch/dataprepper/plugins/processor/dissect/DissectProcessorConfig.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,28 @@ | ||
package org.opensearch.dataprepper.plugins.processor.dissect; | ||
|
||
import com.fasterxml.jackson.annotation.JsonProperty; | ||
import jakarta.validation.constraints.NotNull; | ||
import org.opensearch.dataprepper.plugins.processor.mutateevent.TargetType; | ||
|
||
import java.util.Map; | ||
|
||
public class DissectProcessorConfig { | ||
@NotNull | ||
@JsonProperty("map") | ||
private Map<String, String> map; | ||
@JsonProperty("target_types") | ||
private Map<String, TargetType> targetTypes; | ||
@JsonProperty("dissect_when") | ||
private String dissectWhen; | ||
|
||
public String getDissectWhen(){ | ||
return dissectWhen; | ||
} | ||
|
||
public Map<String, String> getMap() { | ||
return map; | ||
} | ||
|
||
public Map<String, TargetType> getTargetTypes() { return targetTypes; } | ||
|
||
} |
Oops, something went wrong.