Skip to content

Commit

Permalink
Support forecast tasks in profile API; enable index field modifications
Browse files Browse the repository at this point in the history
Signed-off-by: Kaituo Li <[email protected]>
  • Loading branch information
kaituo committed Sep 26, 2024
1 parent cbe1f33 commit b706288
Show file tree
Hide file tree
Showing 32 changed files with 1,078 additions and 102 deletions.
4 changes: 0 additions & 4 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -696,15 +696,11 @@ List<String> jacocoExclusions = [

// TODO: add test coverage (kaituo)
'org.opensearch.forecast.*',
'org.opensearch.ad.transport.ADHCImputeNodeResponse',
'org.opensearch.ad.transport.GetAnomalyDetectorTransportAction',
'org.opensearch.timeseries.transport.BooleanNodeResponse',
'org.opensearch.timeseries.ml.TimeSeriesSingleStreamCheckpointDao',
'org.opensearch.timeseries.transport.JobRequest',
'org.opensearch.timeseries.transport.handler.ResultBulkIndexingHandler',
'org.opensearch.timeseries.ml.Inferencer',
'org.opensearch.timeseries.transport.SingleStreamResultRequest',
'org.opensearch.timeseries.transport.BooleanResponse',
'org.opensearch.timeseries.rest.handler.IndexJobActionHandler.1',
'org.opensearch.timeseries.transport.SuggestConfigParamResponse',
'org.opensearch.timeseries.transport.SuggestConfigParamRequest',
Expand Down
3 changes: 2 additions & 1 deletion src/main/java/org/opensearch/ad/model/ADTask.java
Original file line number Diff line number Diff line change
Expand Up @@ -345,7 +345,8 @@ public static ADTask parse(XContentParser parser, String taskId) throws IOExcept
detector.getCustomResultIndexMinSize(),
detector.getCustomResultIndexMinAge(),
detector.getCustomResultIndexTTL(),
detector.getFlattenResultIndexMapping()
detector.getFlattenResultIndexMapping(),
detector.getLastBreakingUIChangeTime()
);
return new Builder()
.taskId(parsedTaskId)
Expand Down
17 changes: 14 additions & 3 deletions src/main/java/org/opensearch/ad/model/AnomalyDetector.java
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,8 @@ public Integer getShingleSize(Integer customShingleSize) {
* @param customResultIndexMinAge custom result index lifecycle management min age condition
* @param customResultIndexTTL custom result index lifecycle management ttl
* @param flattenResultIndexMapping flag to indicate whether to flatten result index mapping or not
* @param lastBreakingUIChangeTime last update time to configuration that can break UI and we have
* to display updates from the changed time
*/
public AnomalyDetector(
String detectorId,
Expand Down Expand Up @@ -178,7 +180,8 @@ public AnomalyDetector(
Integer customResultIndexMinSize,
Integer customResultIndexMinAge,
Integer customResultIndexTTL,
Boolean flattenResultIndexMapping
Boolean flattenResultIndexMapping,
Instant lastBreakingUIChangeTime
) {
super(
detectorId,
Expand Down Expand Up @@ -206,7 +209,8 @@ public AnomalyDetector(
customResultIndexMinSize,
customResultIndexMinAge,
customResultIndexTTL,
flattenResultIndexMapping
flattenResultIndexMapping,
lastBreakingUIChangeTime
);

checkAndThrowValidationErrors(ValidationAspect.DETECTOR);
Expand Down Expand Up @@ -284,6 +288,7 @@ public AnomalyDetector(StreamInput input) throws IOException {
this.customResultIndexMinAge = input.readOptionalInt();
this.customResultIndexTTL = input.readOptionalInt();
this.flattenResultIndexMapping = input.readOptionalBoolean();
this.lastUIBreakingChangeTime = input.readOptionalInstant();
}

public XContentBuilder toXContent(XContentBuilder builder) throws IOException {
Expand Down Expand Up @@ -350,6 +355,7 @@ public void writeTo(StreamOutput output) throws IOException {
output.writeOptionalInt(customResultIndexMinAge);
output.writeOptionalInt(customResultIndexTTL);
output.writeOptionalBoolean(flattenResultIndexMapping);
output.writeOptionalInstant(lastUIBreakingChangeTime);
}

@Override
Expand Down Expand Up @@ -447,6 +453,7 @@ public static AnomalyDetector parse(
Integer customResultIndexMinAge = null;
Integer customResultIndexTTL = null;
Boolean flattenResultIndexMapping = null;
Instant lastBreakingUIChangeTime = null;

ensureExpectedToken(XContentParser.Token.START_OBJECT, parser.currentToken(), parser);
while (parser.nextToken() != XContentParser.Token.END_OBJECT) {
Expand Down Expand Up @@ -584,6 +591,9 @@ public static AnomalyDetector parse(
case FLATTEN_RESULT_INDEX_MAPPING:
flattenResultIndexMapping = onlyParseBooleanValue(parser);
break;
case BREAKING_UI_CHANGE_TIME:
lastBreakingUIChangeTime = ParseUtils.toInstant(parser);
break;
default:
parser.skipChildren();
break;
Expand Down Expand Up @@ -615,7 +625,8 @@ public static AnomalyDetector parse(
customResultIndexMinSize,
customResultIndexMinAge,
customResultIndexTTL,
flattenResultIndexMapping
flattenResultIndexMapping,
lastBreakingUIChangeTime
);
detector.setDetectionDateRange(detectionDateRange);
return detector;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,11 @@ protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient cli
: WriteRequest.RefreshPolicy.IMMEDIATE;
RestRequest.Method method = request.getHttpRequest().method();

if (method == RestRequest.Method.POST && detectorId != AnomalyDetector.NO_ID) {
// reset detector to empty string detectorId is only meant for updating detector
detectorId = AnomalyDetector.NO_ID;
}

IndexAnomalyDetectorRequest indexAnomalyDetectorRequest = new IndexAnomalyDetectorRequest(
detectorId,
seqNo,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -245,7 +245,8 @@ protected AnomalyDetector copyConfig(User user, Config config) {
config.getCustomResultIndexMinSize(),
config.getCustomResultIndexMinAge(),
config.getCustomResultIndexTTL(),
config.getFlattenResultIndexMapping()
config.getFlattenResultIndexMapping(),
breakingUIChange ? Instant.now() : config.getLastBreakingUIChangeTime()
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,18 @@ public class ForecastTaskProfileRunner implements TaskProfileRunner<ForecastTask

@Override
public void getTaskProfile(ForecastTask configLevelTask, ActionListener<ForecastTaskProfile> listener) {
// return null since forecasting have no in-memory task profiles as AD
listener.onResponse(null);
// return null in other fields since forecasting have no in-memory task profiles as AD
listener
.onResponse(
new ForecastTaskProfile(
configLevelTask,
null,
null,
null,
configLevelTask == null ? null : configLevelTask.getTaskId(),
null
)
);
}

}
9 changes: 6 additions & 3 deletions src/main/java/org/opensearch/forecast/model/ForecastTask.java
Original file line number Diff line number Diff line change
Expand Up @@ -343,7 +343,8 @@ public static ForecastTask parse(XContentParser parser, String taskId) throws IO
forecaster.getCustomResultIndexMinSize(),
forecaster.getCustomResultIndexMinAge(),
forecaster.getCustomResultIndexTTL(),
forecaster.getFlattenResultIndexMapping()
forecaster.getFlattenResultIndexMapping(),
forecaster.getLastBreakingUIChangeTime()
);
return new Builder()
.taskId(parsedTaskId)
Expand Down Expand Up @@ -375,10 +376,12 @@ public static ForecastTask parse(XContentParser parser, String taskId) throws IO
@Generated
@Override
public boolean equals(Object other) {
if (this == other)
if (this == other) {
return true;
if (other == null || getClass() != other.getClass())
}
if (other == null || getClass() != other.getClass()) {
return false;
}
ForecastTask that = (ForecastTask) other;
return super.equals(that)
&& Objects.equal(getForecaster(), that.getForecaster())
Expand Down
13 changes: 10 additions & 3 deletions src/main/java/org/opensearch/forecast/model/Forecaster.java
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,8 @@ public Forecaster(
Integer customResultIndexMinSize,
Integer customResultIndexMinAge,
Integer customResultIndexTTL,
Boolean flattenResultIndexMapping
Boolean flattenResultIndexMapping,
Instant lastBreakingUIChangeTime
) {
super(
forecasterId,
Expand Down Expand Up @@ -163,7 +164,8 @@ public Forecaster(
customResultIndexMinSize,
customResultIndexMinAge,
customResultIndexTTL,
flattenResultIndexMapping
flattenResultIndexMapping,
lastBreakingUIChangeTime
);

checkAndThrowValidationErrors(ValidationAspect.FORECASTER);
Expand Down Expand Up @@ -306,6 +308,7 @@ public static Forecaster parse(
Integer customResultIndexMinAge = null;
Integer customResultIndexTTL = null;
Boolean flattenResultIndexMapping = null;
Instant lastBreakingUIChangeTime = null;

ensureExpectedToken(XContentParser.Token.START_OBJECT, parser.currentToken(), parser);
while (parser.nextToken() != XContentParser.Token.END_OBJECT) {
Expand Down Expand Up @@ -437,6 +440,9 @@ public static Forecaster parse(
case FLATTEN_RESULT_INDEX_MAPPING:
flattenResultIndexMapping = parser.booleanValue();
break;
case BREAKING_UI_CHANGE_TIME:
lastBreakingUIChangeTime = ParseUtils.toInstant(parser);
break;
default:
parser.skipChildren();
break;
Expand Down Expand Up @@ -468,7 +474,8 @@ public static Forecaster parse(
customResultIndexMinSize,
customResultIndexMinAge,
customResultIndexTTL,
flattenResultIndexMapping
flattenResultIndexMapping,
lastBreakingUIChangeTime
);
return forecaster;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,11 @@ protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient cli
: WriteRequest.RefreshPolicy.IMMEDIATE;
RestRequest.Method method = request.getHttpRequest().method();

if (method == RestRequest.Method.POST && forecasterId != Config.NO_ID) {
// reset detector to empty string detectorId is only meant for updating detector
forecasterId = Config.NO_ID;
}

IndexForecasterRequest indexAnomalyDetectorRequest = new IndexForecasterRequest(
forecasterId,
seqNo,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -258,7 +258,8 @@ protected Config copyConfig(User user, Config config) {
config.getCustomResultIndexMinSize(),
config.getCustomResultIndexMinAge(),
config.getCustomResultIndexTTL(),
config.getFlattenResultIndexMapping()
config.getFlattenResultIndexMapping(),
breakingUIChange ? Instant.now() : config.getLastBreakingUIChangeTime()
);
}

Expand Down
19 changes: 18 additions & 1 deletion src/main/java/org/opensearch/timeseries/model/Config.java
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,11 @@ public abstract class Config implements Writeable, ToXContentObject {
public static final String RESULT_INDEX_FIELD_MIN_AGE = "result_index_min_age";
public static final String RESULT_INDEX_FIELD_TTL = "result_index_ttl";
public static final String FLATTEN_RESULT_INDEX_MAPPING = "flatten_result_index_mapping";
// Changing categorical field, feature attributes, interval, windowDelay, time field, horizon, indices,
// result index would force us to display results only from the most recent update. Otherwise,
// the UI appear cluttered and unclear.
// We cannot use last update time as it would change whenever other fields like name changes.
public static final String BREAKING_UI_CHANGE_TIME = "last_ui_breaking_change_time";

protected String id;
protected Long version;
Expand Down Expand Up @@ -120,6 +125,7 @@ public abstract class Config implements Writeable, ToXContentObject {
protected Integer customResultIndexMinAge;
protected Integer customResultIndexTTL;
protected Boolean flattenResultIndexMapping;
protected Instant lastUIBreakingChangeTime;

public static String INVALID_RESULT_INDEX_NAME_SIZE = "Result index name size must contains less than "
+ MAX_RESULT_INDEX_NAME_SIZE
Expand Down Expand Up @@ -151,7 +157,8 @@ protected Config(
Integer customResultIndexMinSize,
Integer customResultIndexMinAge,
Integer customResultIndexTTL,
Boolean flattenResultIndexMapping
Boolean flattenResultIndexMapping,
Instant lastBreakingUIChangeTime
) {
if (Strings.isBlank(name)) {
errorMessage = CommonMessages.EMPTY_NAME;
Expand Down Expand Up @@ -291,6 +298,7 @@ protected Config(
this.customResultIndexMinAge = Strings.trimToNull(resultIndex) == null ? null : customResultIndexMinAge;
this.customResultIndexTTL = Strings.trimToNull(resultIndex) == null ? null : customResultIndexTTL;
this.flattenResultIndexMapping = Strings.trimToNull(resultIndex) == null ? null : flattenResultIndexMapping;
this.lastUIBreakingChangeTime = lastBreakingUIChangeTime;
}

public int suggestHistory() {
Expand Down Expand Up @@ -335,6 +343,7 @@ public Config(StreamInput input) throws IOException {
this.customResultIndexMinAge = input.readOptionalInt();
this.customResultIndexTTL = input.readOptionalInt();
this.flattenResultIndexMapping = input.readOptionalBoolean();
this.lastUIBreakingChangeTime = input.readOptionalInstant();
}

/*
Expand Down Expand Up @@ -388,6 +397,7 @@ public void writeTo(StreamOutput output) throws IOException {
output.writeOptionalInt(customResultIndexMinAge);
output.writeOptionalInt(customResultIndexTTL);
output.writeOptionalBoolean(flattenResultIndexMapping);
output.writeOptionalInstant(lastUIBreakingChangeTime);
}

public boolean invalidShingleSizeRange(Integer shingleSizeToTest) {
Expand Down Expand Up @@ -525,6 +535,9 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
if (flattenResultIndexMapping != null) {
builder.field(FLATTEN_RESULT_INDEX_MAPPING, flattenResultIndexMapping);
}
if (lastUIBreakingChangeTime != null) {
builder.field(BREAKING_UI_CHANGE_TIME, lastUIBreakingChangeTime.toEpochMilli());
}
return builder;
}

Expand Down Expand Up @@ -737,6 +750,10 @@ public Boolean getFlattenResultIndexMapping() {
return flattenResultIndexMapping;
}

public Instant getLastBreakingUIChangeTime() {
return lastUIBreakingChangeTime;
}

/**
* Identifies redundant feature names.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,7 @@ public abstract class AbstractTimeSeriesActionHandler<T extends ActionResponse,
protected final Clock clock;
protected final Settings settings;
protected final ValidationAspect configValidationAspect;
protected boolean breakingUIChange;

public AbstractTimeSeriesActionHandler(
Config config,
Expand Down Expand Up @@ -203,6 +204,7 @@ public AbstractTimeSeriesActionHandler(
this.settings = settings;
this.handler = new ConfigUpdateConfirmer<>(taskManager, transportService);
this.configValidationAspect = configValidationAspect;
this.breakingUIChange = false;
}

/**
Expand Down Expand Up @@ -456,6 +458,11 @@ private void onGetConfigResponse(GetResponse response, boolean indexingDryRun, S
);
return;
}
} else {
if (!ParseUtils.listEqualsWithoutConsideringOrder(existingConfig.getCategoryFields(), config.getCategoryFields())
|| !Objects.equals(existingConfig.getCustomResultIndexOrAlias(), config.getCustomResultIndexOrAlias())) {
breakingUIChange = true;
}
}

ActionListener<Void> confirmBatchRunningListener = ActionListener
Expand Down Expand Up @@ -675,7 +682,6 @@ protected void validateCategoricalField(
);
}

@SuppressWarnings("unchecked")
protected void searchConfigInputIndices(String configId, boolean indexingDryRun, ActionListener<T> listener) {
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder()
.query(QueryBuilders.matchAllQuery())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ public boolean isAnswerTrue() {

@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeBoolean(answer);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ public boolean isAnswerTrue() {

@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeBoolean(answer);
}

Expand Down
2 changes: 1 addition & 1 deletion src/main/resources/mappings/anomaly-detection-state.json
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
{
"dynamic": false,
"_meta": {
"schema_version": 4
"schema_version": 5
},
"properties": {
"schema_version": {
Expand Down
6 changes: 5 additions & 1 deletion src/main/resources/mappings/config.json
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
{
"dynamic": false,
"_meta": {
"schema_version": 6
"schema_version": 7
},
"properties": {
"schema_version": {
Expand Down Expand Up @@ -232,6 +232,10 @@
},
"flatten_result_index_mapping": {
"type": "boolean"
},
"last_ui_breaking_change_time" : {
"type": "date",
"format": "strict_date_time||epoch_millis"
}
}
}
Loading

0 comments on commit b706288

Please sign in to comment.