Skip to content

Commit

Permalink
Config description changes for aggregate and anomaly detector process…
Browse files Browse the repository at this point in the history
…ors. (#4829)

* dplive1.yaml

Signed-off-by: Krishna Kondaka <[email protected]>

* Delete .github/workflows/static.yml

Signed-off-by: Krishna Kondaka <[email protected]>

* Add json property description for aggregate processor and anomaly detector processors

Signed-off-by: Krishna Kondaka <[email protected]>

* Addressed review comments

Signed-off-by: Krishna Kondaka <[email protected]>

* Fixed build failure

Signed-off-by: Krishna Kondaka <[email protected]>

---------

Signed-off-by: Krishna Kondaka <[email protected]>
Co-authored-by: Krishna Kondaka <[email protected]>
  • Loading branch information
kkondaka and Krishna Kondaka committed Aug 13, 2024
1 parent 2f21a43 commit 1487973
Show file tree
Hide file tree
Showing 9 changed files with 47 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
package org.opensearch.dataprepper.plugins.processor.aggregate;

import org.opensearch.dataprepper.model.configuration.PluginModel;
import com.fasterxml.jackson.annotation.JsonPropertyDescription;
import com.fasterxml.jackson.annotation.JsonProperty;
import jakarta.validation.constraints.AssertTrue;
import jakarta.validation.constraints.NotEmpty;
Expand All @@ -18,27 +19,34 @@ public class AggregateProcessorConfig {

static int DEFAULT_GROUP_DURATION_SECONDS = 180;

@JsonPropertyDescription("An unordered list by which to group events. Events with the same values as these keys are put into the same group. If an event does not contain one of the identification_keys, then the value of that key is considered to be equal to null. At least one identification_key is required (for example, [\"sourceIp\", \"destinationIp\", \"port\"].")
@JsonProperty("identification_keys")
@NotEmpty
private List<String> identificationKeys;

@JsonPropertyDescription("The amount of time that a group should exist before it is concluded automatically. Supports ISO_8601 notation strings (\"PT20.345S\", \"PT15M\", etc.) as well as simple notation for seconds (\"60s\") and milliseconds (\"1500ms\"). Default value is 180s.")
@JsonProperty("group_duration")
private Duration groupDuration = Duration.ofSeconds(DEFAULT_GROUP_DURATION_SECONDS);

@JsonPropertyDescription("The action to be performed on each group. One of the available aggregate actions must be provided, or you can create custom aggregate actions. remove_duplicates and put_all are the available actions. For more information, see Creating New Aggregate Actions.")
@JsonProperty("action")
@NotNull
private PluginModel aggregateAction;

@JsonPropertyDescription("When local_mode is set to true, the aggregation is performed locally on each Data Prepper node instead of forwarding events to a specific node based on the identification_keys using a hash function. Default is false.")
@JsonProperty("local_mode")
@NotNull
private Boolean localMode = false;

@JsonPropertyDescription("A boolean indicating if the unaggregated events should be forwarded to the next processor/sink in the chain.")
@JsonProperty("output_unaggregated_events")
private Boolean outputUnaggregatedEvents = false;

@JsonPropertyDescription("Tag to be used for aggregated events to distinguish aggregated events from unaggregated events.")
@JsonProperty("aggregated_events_tag")
private String aggregatedEventsTag;

@JsonPropertyDescription("A Data Prepper conditional expression (https://opensearch.org/docs/latest/data-prepper/pipelines/expression-syntax/), such as '/some-key == \"test\"', that will be evaluated to determine whether the processor will be run on the event.")
@JsonProperty("aggregate_when")
private String whenCondition;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,11 @@
import java.util.List;

import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonPropertyDescription;

public class AppendAggregateActionConfig {

@JsonPropertyDescription("List of keys to append.")
@JsonProperty("keys_to_append")
List<String> keysToAppend;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import java.util.List;
import java.util.Set;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonPropertyDescription;

public class CountAggregateActionConfig {
static final String SUM_METRIC_NAME = "count";
Expand All @@ -17,21 +18,27 @@ public class CountAggregateActionConfig {
public static final String DEFAULT_END_TIME_KEY = "aggr._end_time";
public static final Set<String> validOutputFormats = new HashSet<>(Set.of(OutputFormat.OTEL_METRICS.toString(), OutputFormat.RAW.toString()));

@JsonPropertyDescription("Key used for storing the count. Default name is aggr._count.")
@JsonProperty("count_key")
String countKey = DEFAULT_COUNT_KEY;

@JsonPropertyDescription("Metric name to be used when otel format is used.")
@JsonProperty("metric_name")
String metricName = SUM_METRIC_NAME;

@JsonPropertyDescription("List of unique keys to count.")
@JsonProperty("unique_keys")
List<String> uniqueKeys = null;

@JsonPropertyDescription("Key used for storing the start time. Default name is aggr._start_time.")
@JsonProperty("start_time_key")
String startTimeKey = DEFAULT_START_TIME_KEY;

@JsonPropertyDescription("Key used for storing the end time. Default name is aggr._end_time.")
@JsonProperty("end_time_key")
String endTimeKey = DEFAULT_END_TIME_KEY;

@JsonPropertyDescription("Format of the aggregated event. otel_metrics is the default output format which outputs in OTel metrics SUM type with count as value. Other options is - raw - which generates a JSON object with the count_key field as a count value and the start_time_key field with aggregation start time as value.")
@JsonProperty("output_format")
String outputFormat = OutputFormat.OTEL_METRICS.toString();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import java.util.List;
import java.util.HashSet;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonPropertyDescription;
import jakarta.validation.constraints.NotNull;

public class HistogramAggregateActionConfig {
Expand All @@ -25,27 +26,34 @@ public class HistogramAggregateActionConfig {
public static final String DURATION_KEY = "duration";
public static final Set<String> validOutputFormats = new HashSet<>(Set.of(OutputFormat.OTEL_METRICS.toString(), OutputFormat.RAW.toString()));

@JsonPropertyDescription("Name of the field in the events the histogram generates.")
@JsonProperty("key")
@NotNull
String key;

@JsonPropertyDescription("The name of units for the values in the key. For example, bytes, traces etc")
@JsonProperty("units")
@NotNull
String units;

@JsonPropertyDescription("Metric name to be used when otel format is used.")
@JsonProperty("metric_name")
String metricName = HISTOGRAM_METRIC_NAME;

@JsonPropertyDescription("Key prefix used by all the fields created in the aggregated event. Having a prefix ensures that the names of the histogram event do not conflict with the field names in the event.")
@JsonProperty("generated_key_prefix")
String generatedKeyPrefix = DEFAULT_GENERATED_KEY_PREFIX;

@JsonPropertyDescription("A list of buckets (values of type double) indicating the buckets in the histogram.")
@JsonProperty("buckets")
@NotNull
List<Number> buckets;

@JsonPropertyDescription("Format of the aggregated event. otel_metrics is the default output format which outputs in OTel metrics SUM type with count as value. Other options is - raw - which generates a JSON object with the count_key field as a count value and the start_time_key field with aggregation start time as value.")
@JsonProperty("output_format")
String outputFormat = OutputFormat.OTEL_METRICS.toString();

@JsonPropertyDescription("A Boolean value indicating whether the histogram should include the min and max of the values in the aggregation.")
@JsonProperty("record_minmax")
boolean recordMinMax = false;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,11 @@

import com.fasterxml.jackson.annotation.JsonProperty;
import jakarta.validation.constraints.NotNull;
import com.fasterxml.jackson.annotation.JsonPropertyDescription;
import jakarta.validation.constraints.AssertTrue;

public class PercentSamplerAggregateActionConfig {
@JsonPropertyDescription("Percent value of the sampling to be done. 0.0 < percent < 100.0")
@JsonProperty("percent")
@NotNull
private double percent;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,18 @@
import java.util.Set;
import java.util.HashSet;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonPropertyDescription;
import jakarta.validation.constraints.NotNull;

public class RateLimiterAggregateActionConfig {
public static final Set<String> validRateLimiterModes = new HashSet<>(Set.of(RateLimiterMode.BLOCK.toString(), RateLimiterMode.DROP.toString()));

@JsonPropertyDescription("The number of events allowed per second.")
@JsonProperty("events_per_second")
@NotNull
int eventsPerSecond;

@JsonPropertyDescription("Indicates what action the rate_limiter takes when the number of events received is greater than the number of events allowed per second. Default value is block, which blocks the processor from running after the maximum number of events allowed per second is reached until the next second. Alternatively, the drop option drops the excess events received in that second. Default is block")
@JsonProperty("when_exceeds")
String whenExceedsMode = RateLimiterMode.BLOCK.toString();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,20 +6,24 @@
package org.opensearch.dataprepper.plugins.processor.aggregate.actions;

import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonPropertyDescription;
import jakarta.validation.constraints.NotNull;
import jakarta.validation.constraints.AssertTrue;

import java.time.Duration;

public class TailSamplerAggregateActionConfig {
@JsonPropertyDescription("Period to wait before considering that a trace event is complete")
@JsonProperty("wait_period")
@NotNull
private Duration waitPeriod;

@JsonPropertyDescription("Percent value to use for sampling non error events. 0.0 < percent < 100.0")
@JsonProperty("percent")
@NotNull
private Integer percent;

@JsonPropertyDescription("A Data Prepper conditional expression (https://opensearch.org/docs/latest/data-prepper/pipelines/expression-syntax/), such as '/some-key == \"test\"', that will be evaluated to determine whether the event is an error event or not")
@JsonProperty("condition")
private String condition;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
package org.opensearch.dataprepper.plugins.processor.anomalydetector;

import org.opensearch.dataprepper.model.configuration.PluginModel;
import com.fasterxml.jackson.annotation.JsonPropertyDescription;
import com.fasterxml.jackson.annotation.JsonProperty;
import jakarta.validation.constraints.NotEmpty;
import jakarta.validation.constraints.NotNull;
Expand All @@ -14,20 +15,25 @@
import java.util.List;

public class AnomalyDetectorProcessorConfig {
@JsonPropertyDescription("The ML algorithm (or model) used to detect anomalies. You must provide a mode. See random_cut_forest mode.")
@JsonProperty("mode")
@NotNull
private PluginModel detectorMode;

@JsonPropertyDescription("A non-ordered List<String> that is used as input to the ML algorithm to detect anomalies in the values of the keys in the list. At least one key is required.")
@JsonProperty("keys")
@NotEmpty
private List<String> keys;

@JsonPropertyDescription("If provided, anomalies will be detected within each unique instance of these keys. For example, if you provide the ip field, anomalies will be detected separately for each unique IP address.")
@JsonProperty("identification_keys")
private List<String> identificationKeys = Collections.emptyList();

@JsonPropertyDescription("RCF will try to automatically learn and reduce the number of anomalies detected. For example, if latency is consistently between 50 and 100, and then suddenly jumps to around 1000, only the first one or two data points after the transition will be detected (unless there are other spikes/anomalies). Similarly, for repeated spikes to the same level, RCF will likely eliminate many of the spikes after a few initial ones. This is because the default setting is to minimize the number of alerts detected. Setting the verbose setting to true will cause RCF to consistently detect these repeated cases, which may be useful for detecting anomalous behavior that lasts an extended period of time. Default is false.")
@JsonProperty("verbose")
private Boolean verbose = false;

@JsonPropertyDescription("If using the identification_keys settings, a new ML model will be created for every degree of cardinality. This can cause a large amount of memory usage, so it is helpful to set a limit on the number of models. Default limit is 5000.")
@JsonProperty("cardinality_limit")
private int cardinalityLimit = 5000;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
package org.opensearch.dataprepper.plugins.processor.anomalydetector.modes;

import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonPropertyDescription;
import jakarta.validation.constraints.AssertTrue;

import java.util.Set;
Expand All @@ -25,25 +26,31 @@ public class RandomCutForestModeConfig {

public static final String VERSION_1_0 = "1.0";

@JsonPropertyDescription("The algorithm version number. Default is 1.0.")
@JsonProperty("version")
private String version = VERSION_1_0;

public static final Set<String> validVersions = new HashSet<>(Set.of(VERSION_1_0));

@JsonPropertyDescription("The type of data sent to the algorithm. Default is metrics type")
@JsonProperty("type")
private String type = RandomCutForestType.METRICS.toString();

public static final Set<String> validTypes = new HashSet<>(Set.of(RandomCutForestType.METRICS.toString()));

@JsonPropertyDescription("The shingle size used in the ML algorithm. Default is 60.")
@JsonProperty("shingle_size")
private int shingleSize = DEFAULT_SHINGLE_SIZE;

@JsonPropertyDescription("The sample size used in the ML algorithm. Default is 256.")
@JsonProperty("sample_size")
private int sampleSize = DEFAULT_SAMPLE_SIZE;

@JsonPropertyDescription("The time decay value used in the ML algorithm. Used as the mathematical expression timeDecay divided by SampleSize in the ML algorithm. Default is 0.1")
@JsonProperty("time_decay")
private double timeDecay = DEFAULT_TIME_DECAY;

@JsonPropertyDescription("Output after indicates the number of events to consume before outputting anamolies. Default is 32.")
@JsonProperty("output_after")
private int outputAfter = DEFAULT_OUTPUT_AFTER;

Expand Down

0 comments on commit 1487973

Please sign in to comment.