diff --git a/.github/workflows/test_build_multi_platform.yml b/.github/workflows/test_build_multi_platform.yml index 624afc725..03c683e3e 100644 --- a/.github/workflows/test_build_multi_platform.yml +++ b/.github/workflows/test_build_multi_platform.yml @@ -52,11 +52,6 @@ jobs: - name: Multi Nodes Integration Testing run: | ./gradlew integTest -PnumNodes=3 - - name: Upload Coverage Report - uses: codecov/codecov-action@v3 - with: - file: ./build/reports/jacoco/test/jacocoTestReport.xml - flags: plugin Build-ad-linux: needs: [Get-CI-Image-Tag, spotless] @@ -93,11 +88,6 @@ jobs: ./gradlew build -x spotlessJava && ./gradlew publishToMavenLocal && ./gradlew integTest -PnumNodes=3" - - name: Upload Coverage Report - uses: codecov/codecov-action@v3 - with: - file: ./build/reports/jacoco/test/jacocoTestReport.xml - flags: plugin Build-ad-macos: needs: spotless @@ -127,14 +117,15 @@ jobs: - name: Build and Run Tests run: | ./gradlew build -x spotlessJava + # coverage.gradle is only applied in single local node test + - name: Upload Coverage Report + uses: codecov/codecov-action@v3 + with: + file: ./build/reports/jacoco/test/jacocoTestReport.xml + flags: plugin - name: Publish to Maven Local run: | ./gradlew publishToMavenLocal - name: Multi Nodes Integration Testing run: | ./gradlew integTest -PnumNodes=3 - - name: Upload Coverage Report - uses: codecov/codecov-action@v3 - with: - file: ./build/reports/jacoco/test/jacocoTestReport.xml - flags: plugin diff --git a/src/main/java/org/opensearch/ad/constant/ADCommonMessages.java b/src/main/java/org/opensearch/ad/constant/ADCommonMessages.java index d782c5e4c..dadd2921b 100644 --- a/src/main/java/org/opensearch/ad/constant/ADCommonMessages.java +++ b/src/main/java/org/opensearch/ad/constant/ADCommonMessages.java @@ -25,7 +25,6 @@ public class ADCommonMessages { public static String DETECTOR_IS_RUNNING = "Detector is already running"; public static String DETECTOR_MISSING = "Detector is missing"; public static String AD_TASK_ACTION_MISSING = "AD task action is missing"; - public static final String INDEX_NOT_FOUND = "index does not exist"; public static final String UNSUPPORTED_PROFILE_TYPE = "Unsupported profile types"; public static final String REQUEST_THROTTLED_MSG = "Request throttled. Please try again later."; diff --git a/src/main/java/org/opensearch/ad/rest/handler/ADModelValidationActionHandler.java b/src/main/java/org/opensearch/ad/rest/handler/ADModelValidationActionHandler.java index 78a1dfbe9..35f3c5834 100644 --- a/src/main/java/org/opensearch/ad/rest/handler/ADModelValidationActionHandler.java +++ b/src/main/java/org/opensearch/ad/rest/handler/ADModelValidationActionHandler.java @@ -17,6 +17,7 @@ import org.opensearch.core.xcontent.NamedXContentRegistry; import org.opensearch.timeseries.AnalysisType; import org.opensearch.timeseries.feature.SearchFeatureDao; +import org.opensearch.timeseries.model.ValidationIssueType; import org.opensearch.timeseries.rest.handler.ModelValidationActionHandler; import org.opensearch.timeseries.transport.ValidateConfigResponse; import org.opensearch.timeseries.util.SecurityClientUtil; @@ -50,7 +51,8 @@ public ADModelValidationActionHandler( clock, settings, user, - AnalysisType.AD + AnalysisType.AD, + ValidationIssueType.DETECTION_INTERVAL ); } diff --git a/src/main/java/org/opensearch/ad/transport/ValidateAnomalyDetectorTransportAction.java b/src/main/java/org/opensearch/ad/transport/ValidateAnomalyDetectorTransportAction.java index db43e038c..1b27a4349 100644 --- a/src/main/java/org/opensearch/ad/transport/ValidateAnomalyDetectorTransportAction.java +++ b/src/main/java/org/opensearch/ad/transport/ValidateAnomalyDetectorTransportAction.java @@ -28,6 +28,7 @@ import org.opensearch.rest.RestRequest; import org.opensearch.timeseries.feature.SearchFeatureDao; import org.opensearch.timeseries.model.Config; +import org.opensearch.timeseries.model.ValidationAspect; import org.opensearch.timeseries.rest.handler.Processor; import org.opensearch.timeseries.transport.BaseValidateConfigTransportAction; import org.opensearch.timeseries.transport.ValidateConfigRequest; @@ -61,7 +62,8 @@ public ValidateAnomalyDetectorTransportAction( actionFilters, transportService, searchFeatureDao, - AD_FILTER_BY_BACKEND_ROLES + AD_FILTER_BY_BACKEND_ROLES, + ValidationAspect.DETECTOR ); } diff --git a/src/main/java/org/opensearch/forecast/rest/handler/ForecastModelValidationActionHandler.java b/src/main/java/org/opensearch/forecast/rest/handler/ForecastModelValidationActionHandler.java index f03c1fdc7..f06021e1f 100644 --- a/src/main/java/org/opensearch/forecast/rest/handler/ForecastModelValidationActionHandler.java +++ b/src/main/java/org/opensearch/forecast/rest/handler/ForecastModelValidationActionHandler.java @@ -17,6 +17,7 @@ import org.opensearch.forecast.model.Forecaster; import org.opensearch.timeseries.AnalysisType; import org.opensearch.timeseries.feature.SearchFeatureDao; +import org.opensearch.timeseries.model.ValidationIssueType; import org.opensearch.timeseries.rest.handler.ModelValidationActionHandler; import org.opensearch.timeseries.transport.ValidateConfigResponse; import org.opensearch.timeseries.util.SecurityClientUtil; @@ -50,7 +51,8 @@ public ForecastModelValidationActionHandler( clock, settings, user, - AnalysisType.FORECAST + AnalysisType.FORECAST, + ValidationIssueType.FORECAST_INTERVAL ); } diff --git a/src/main/java/org/opensearch/forecast/transport/ValidateForecasterTransportAction.java b/src/main/java/org/opensearch/forecast/transport/ValidateForecasterTransportAction.java index 01b38ffa6..781488dc2 100644 --- a/src/main/java/org/opensearch/forecast/transport/ValidateForecasterTransportAction.java +++ b/src/main/java/org/opensearch/forecast/transport/ValidateForecasterTransportAction.java @@ -22,6 +22,7 @@ import org.opensearch.rest.RestRequest; import org.opensearch.timeseries.feature.SearchFeatureDao; import org.opensearch.timeseries.model.Config; +import org.opensearch.timeseries.model.ValidationAspect; import org.opensearch.timeseries.rest.handler.Processor; import org.opensearch.timeseries.transport.BaseValidateConfigTransportAction; import org.opensearch.timeseries.transport.ValidateConfigRequest; @@ -55,7 +56,8 @@ public ValidateForecasterTransportAction( actionFilters, transportService, searchFeatureDao, - FORECAST_FILTER_BY_BACKEND_ROLES + FORECAST_FILTER_BY_BACKEND_ROLES, + ValidationAspect.FORECASTER ); } diff --git a/src/main/java/org/opensearch/timeseries/constant/CommonMessages.java b/src/main/java/org/opensearch/timeseries/constant/CommonMessages.java index 996ea4dc0..330833a7e 100644 --- a/src/main/java/org/opensearch/timeseries/constant/CommonMessages.java +++ b/src/main/java/org/opensearch/timeseries/constant/CommonMessages.java @@ -54,9 +54,9 @@ public static String getTooManyCategoricalFieldErr(int limit) { public static String TIME_FIELD_NOT_ENOUGH_HISTORICAL_DATA = "There isn't enough historical data found with current timefield selected."; public static String CATEGORY_FIELD_TOO_SPARSE = - "Data is most likely too sparse with the given category fields. Consider revising category field/s or ingesting more data "; + "Data is most likely too sparse with the given category fields. Consider revising category field/s or ingesting more data."; public static String WINDOW_DELAY_REC = - "Latest seen data point is at least %d minutes ago, consider changing window delay to at least %d minutes."; + "Latest seen data point is at least %d minutes ago. Consider changing window delay to at least %d minutes."; public static String INTERVAL_REC = "The selected interval might collect sparse data. Consider changing interval length to: "; public static String RAW_DATA_TOO_SPARSE = "Source index data is potentially too sparse for model training. Consider changing interval length or ingesting more data"; @@ -65,13 +65,14 @@ public static String getTooManyCategoricalFieldErr(int limit) { public static String CATEGORY_FIELD_NO_DATA = "No entity was found with the given categorical fields. Consider revising category field/s or ingesting more data"; public static String FEATURE_QUERY_TOO_SPARSE = - "Data is most likely too sparse when given feature queries are applied. Consider revising feature queries."; + "Data is most likely too sparse when given feature queries are applied. Consider revising feature queries"; public static String TIMEOUT_ON_INTERVAL_REC = "Timed out getting interval recommendation"; public static final String NOT_EXISTENT_VALIDATION_TYPE = "The given validation type doesn't exist"; public static final String NOT_EXISTENT_SUGGEST_TYPE = "The given suggest type doesn't exist"; public static final String DESCRIPTION_LENGTH_TOO_LONG = "Description length is too long. Max length is " + TimeSeriesSettings.MAX_DESCRIPTION_LENGTH + " characters."; + public static final String INDEX_NOT_FOUND = "index does not exist"; // ====================================== // Index message diff --git a/src/main/java/org/opensearch/timeseries/feature/SearchFeatureDao.java b/src/main/java/org/opensearch/timeseries/feature/SearchFeatureDao.java index ea53bfc87..89ad90926 100644 --- a/src/main/java/org/opensearch/timeseries/feature/SearchFeatureDao.java +++ b/src/main/java/org/opensearch/timeseries/feature/SearchFeatureDao.java @@ -19,6 +19,7 @@ import java.io.IOException; import java.time.Clock; import java.time.ZonedDateTime; +import java.util.AbstractMap.SimpleImmutableEntry; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; @@ -29,6 +30,7 @@ import java.util.Map.Entry; import java.util.Optional; import java.util.stream.Collectors; +import java.util.stream.Stream; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -65,6 +67,7 @@ import org.opensearch.timeseries.constant.CommonName; import org.opensearch.timeseries.model.Config; import org.opensearch.timeseries.model.Entity; +import org.opensearch.timeseries.model.IntervalTimeConfiguration; import org.opensearch.timeseries.util.ParseUtils; import org.opensearch.timeseries.util.SecurityClientUtil; @@ -92,7 +95,6 @@ public SearchFeatureDao( Client client, NamedXContentRegistry xContent, SecurityClientUtil clientUtil, - Settings settings, ClusterService clusterService, int minimumDocCount, Clock clock, @@ -122,7 +124,6 @@ public SearchFeatureDao( * @param client ES client for queries * @param xContent ES XContentRegistry * @param clientUtil utility for ES client - * @param settings ES settings * @param clusterService ES ClusterService * @param minimumDocCount minimum doc count required for an entity; used to * make sure an entity has enough samples for preview @@ -139,7 +140,6 @@ public SearchFeatureDao( client, xContent, clientUtil, - settings, clusterService, minimumDocCount, Clock.systemUTC(), @@ -690,37 +690,7 @@ public void getColdStartSamplesForPeriods( ) { SearchRequest request = createColdStartFeatureSearchRequest(config, ranges, entity); final ActionListener searchResponseListener = ActionListener.wrap(response -> { - Aggregations aggs = response.getAggregations(); - if (aggs == null) { - listener.onResponse(Collections.emptyList()); - return; - } - - long docCountThreshold = includesEmptyBucket ? -1 : 0; - - // Extract buckets and order by from_as_string. Currently by default it is ascending. Better not to assume it. - // Example responses from date range bucket aggregation: - // "aggregations":{"date_range":{"buckets":[{"key":"1598865166000-1598865226000","from":1.598865166E12," - // from_as_string":"1598865166000","to":1.598865226E12,"to_as_string":"1598865226000","doc_count":3, - // "deny_max":{"value":154.0}},{"key":"1598869006000-1598869066000","from":1.598869006E12, - // "from_as_string":"1598869006000","to":1.598869066E12,"to_as_string":"1598869066000","doc_count":3, - // "deny_max":{"value":141.0}}, - // We don't want to use default 0 for sum/count aggregation as it might cause false positives during scoring. - // Terms aggregation only returns non-zero count values. If we use a lot of 0s during cold start, - // we will see alarming very easily. - listener - .onResponse( - aggs - .asList() - .stream() - .filter(InternalDateRange.class::isInstance) - .flatMap(agg -> ((InternalDateRange) agg).getBuckets().stream()) - .filter(bucket -> bucket.getFrom() != null && bucket.getFrom() instanceof ZonedDateTime) - .filter(bucket -> bucket.getDocCount() > docCountThreshold) - .sorted(Comparator.comparing((Bucket bucket) -> (ZonedDateTime) bucket.getFrom())) - .map(bucket -> parseBucket(bucket, config.getEnabledFeatureIds())) - .collect(Collectors.toList()) - ); + listener.onResponse(parseColdStartSampleResp(response, includesEmptyBucket, config)); }, listener::onFailure); // inject user role while searching. @@ -735,7 +705,109 @@ public void getColdStartSamplesForPeriods( ); } - private SearchRequest createColdStartFeatureSearchRequest(Config detector, List> ranges, Optional entity) { + /** + * Parses the response from a search query for cold start samples, extracting and processing + * the relevant buckets to obtain their parsed values. + * + * This method processes the aggregations from the provided search response to extract and sort the + * buckets. Only buckets that meet certain criteria are included: + * - The `from` field must be a non-null instance of `ZonedDateTime`. + * - The bucket's document count must be greater than the specified threshold, which is determined + * by the `includesEmptyBucket` parameter. + * + * The method returns a list of Optional double array containing the parsed values of the buckets. + * Buckets for which the parseBucket method returns Optional.empty() are excluded from the final list. + * + * @param response the search response containing the aggregations + * @param includesEmptyBucket a boolean flag indicating whether to include buckets with a document + * count of zero (if true, the threshold is set to -1; otherwise, it is set to 0) + * @param config the configuration object containing necessary settings and feature IDs + * @return a list of Optional double array containing the parsed values of the valid buckets, or an empty + * list if the aggregations are null or no valid buckets are found + */ + public List> parseColdStartSampleResp(SearchResponse response, boolean includesEmptyBucket, Config config) { + Aggregations aggs = response.getAggregations(); + if (aggs == null) { + logger.warn("Unexpected empty response"); + return Collections.emptyList(); + } + + long docCountThreshold = includesEmptyBucket ? -1 : 0; + + // Extract buckets and order by from_as_string. Currently by default it is ascending. Better not to assume it. + // Example responses from date range bucket aggregation: + // "aggregations":{"date_range":{"buckets":[{"key":"1598865166000-1598865226000","from":1.598865166E12," + // from_as_string":"1598865166000","to":1.598865226E12,"to_as_string":"1598865226000","doc_count":3, + // "deny_max":{"value":154.0}},{"key":"1598869006000-1598869066000","from":1.598869006E12, + // "from_as_string":"1598869006000","to":1.598869066E12,"to_as_string":"1598869066000","doc_count":3, + // "deny_max":{"value":141.0}}, + // We don't want to use default 0 for sum/count aggregation as it might cause false positives during scoring. + // Terms aggregation only returns non-zero count values. If we use a lot of 0s during cold start, + // we will see alarming very easily. + return aggs + .asList() + .stream() + .filter(InternalDateRange.class::isInstance) + .flatMap(agg -> ((InternalDateRange) agg).getBuckets().stream()) + .filter(bucket -> bucket.getFrom() != null && bucket.getFrom() instanceof ZonedDateTime) + .filter(bucket -> bucket.getDocCount() > docCountThreshold) + .sorted(Comparator.comparing((Bucket bucket) -> (ZonedDateTime) bucket.getFrom())) + .map(bucket -> parseBucket(bucket, config.getEnabledFeatureIds())) + .collect(Collectors.toList()); + } + + /** + * Parses the timestamps of the buckets from a search response for cold start samples. + * + * This method processes the aggregations from the provided search response to extract and sort the + * timestamps of the buckets. Only buckets that meet certain criteria are included: + * - The `from` field must be a non-null instance of `ZonedDateTime`. + * - The bucket's document count must be greater than the specified threshold, which is determined + * by the `includesEmptyBucket` parameter. + * - The bucket must have a non-empty result from the `parseBucket` method. + * + * The method returns a list of epoch millisecond timestamps of the `from` fields of the valid buckets. + * + * @param response the search response containing the aggregations + * @param includesEmptyBucket a boolean flag indicating whether to include buckets with a document + * count of zero (if true, the threshold is set to -1; otherwise, it is set to 0) + * @param config the configuration object containing feature enabled information + * @return a list of epoch millisecond timestamps of the valid bucket `from` fields, or an empty list + * if the aggregations are null or no valid buckets are found + */ + public List parseColdStartSampleTimestamp(SearchResponse response, boolean includesEmptyBucket, Config config) { + Aggregations aggs = response.getAggregations(); + if (aggs == null) { + logger.warn("Unexpected empty response"); + return Collections.emptyList(); + } + + long docCountThreshold = includesEmptyBucket ? -1 : 0; + + // Extract buckets and order by from_as_string. Currently by default it is ascending. Better not to assume it. + // Example responses from date range bucket aggregation: + // "aggregations":{"date_range":{"buckets":[{"key":"1598865166000-1598865226000","from":1.598865166E12," + // from_as_string":"1598865166000","to":1.598865226E12,"to_as_string":"1598865226000","doc_count":3, + // "deny_max":{"value":154.0}},{"key":"1598869006000-1598869066000","from":1.598869006E12, + // "from_as_string":"1598869006000","to":1.598869066E12,"to_as_string":"1598869066000","doc_count":3, + // "deny_max":{"value":141.0}}, + // We don't want to use default 0 for sum/count aggregation as it might cause false positives during scoring. + // Terms aggregation only returns non-zero count values. If we use a lot of 0s during cold start, + // we will see alarming very easily. + return aggs + .asList() + .stream() + .filter(InternalDateRange.class::isInstance) + .flatMap(agg -> ((InternalDateRange) agg).getBuckets().stream()) + .filter(bucket -> bucket.getFrom() != null && bucket.getFrom() instanceof ZonedDateTime) + .filter(bucket -> bucket.getDocCount() > docCountThreshold) + .filter(bucket -> parseBucket(bucket, config.getEnabledFeatureIds()).isPresent()) + .sorted(Comparator.comparing((Bucket bucket) -> (ZonedDateTime) bucket.getFrom())) + .map(bucket -> ((ZonedDateTime) bucket.getFrom()).toInstant().toEpochMilli()) + .collect(Collectors.toList()); + } + + public SearchRequest createColdStartFeatureSearchRequest(Config detector, List> ranges, Optional entity) { try { SearchSourceBuilder searchSourceBuilder = ParseUtils.generateColdStartQuery(detector, ranges, entity, xContent); return new SearchRequest(detector.getIndices().toArray(new String[0]), searchSourceBuilder); @@ -754,8 +826,64 @@ private SearchRequest createColdStartFeatureSearchRequest(Config detector, List< } } + public SearchRequest createColdStartFeatureSearchRequestForSingleFeature( + Config detector, + List> ranges, + Optional entity, + int featureIndex + ) { + try { + SearchSourceBuilder searchSourceBuilder = ParseUtils + .generateColdStartQueryForSingleFeature(detector, ranges, entity, xContent, featureIndex); + return new SearchRequest(detector.getIndices().toArray(new String[0]), searchSourceBuilder); + } catch (IOException e) { + logger + .warn( + "Failed to create cold start feature search request for " + + detector.getId() + + " from " + + ranges.get(0).getKey() + + " to " + + ranges.get(ranges.size() - 1).getKey(), + e + ); + throw new IllegalStateException(e); + } + } + @Override public Optional parseBucket(MultiBucketsAggregation.Bucket bucket, List featureIds) { return parseAggregations(Optional.ofNullable(bucket).map(b -> b.getAggregations()), featureIds); } + + /** + * Get train samples within a time range. + * + * @param interval interval to compute ranges + * @param startMilli range start + * @param endMilli range end + * @param numberOfSamples maximum training samples to fetch + * @return list of sample time ranges in ascending order + */ + public List> getTrainSampleRanges( + IntervalTimeConfiguration interval, + long startMilli, + long endMilli, + int numberOfSamples + ) { + long bucketSize = interval.toDuration().toMillis(); + int numBuckets = (int) Math.floor((endMilli - startMilli) / (double) bucketSize); + // adjust if numStrides is more than the max samples + int numIntervals = Math.min(numBuckets, numberOfSamples); + List> sampleRanges = Stream + .iterate(endMilli, i -> i - bucketSize) + .limit(numIntervals) + .map(time -> new SimpleImmutableEntry<>(time - bucketSize, time)) + .collect(Collectors.toList()); + + // Reverse the list to get time ranges in ascending order + Collections.reverse(sampleRanges); + + return sampleRanges; + } } diff --git a/src/main/java/org/opensearch/timeseries/ml/ModelColdStart.java b/src/main/java/org/opensearch/timeseries/ml/ModelColdStart.java index b3e1b04ab..3ccd00428 100644 --- a/src/main/java/org/opensearch/timeseries/ml/ModelColdStart.java +++ b/src/main/java/org/opensearch/timeseries/ml/ModelColdStart.java @@ -14,9 +14,7 @@ import java.time.Clock; import java.time.Duration; import java.time.Instant; -import java.util.AbstractMap.SimpleImmutableEntry; import java.util.ArrayList; -import java.util.Collections; import java.util.Deque; import java.util.List; import java.util.Locale; @@ -24,8 +22,6 @@ import java.util.Map.Entry; import java.util.Optional; import java.util.concurrent.ConcurrentHashMap; -import java.util.stream.Collectors; -import java.util.stream.Stream; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -422,13 +418,13 @@ private void getFeatures( // Create ranges in ascending where the last sample's end time is the given endTimeMs. // Sample ranges are also in ascending order in Opensearch's response. - List> sampleRanges = getTrainSampleRanges(config, startTimeMs, endTimeMs, numberOfSamples); + List> sampleRanges = searchFeatureDao + .getTrainSampleRanges((IntervalTimeConfiguration) config.getInterval(), startTimeMs, endTimeMs, numberOfSamples); if (sampleRanges.isEmpty()) { listener.onResponse(lastRounddataSample); return; } - ActionListener>> getFeaturelistener = ActionListener.wrap(featureSamples -> { int totalNumSamples = featureSamples.size(); @@ -518,32 +514,6 @@ private void getFeatures( } } - /** - * Get train samples within a time range. - * - * @param config accessor to config - * @param startMilli range start - * @param endMilli range end - * @param numberOfSamples maximum training samples to fetch - * @return list of sample time ranges in ascending order - */ - private List> getTrainSampleRanges(Config config, long startMilli, long endMilli, int numberOfSamples) { - long bucketSize = ((IntervalTimeConfiguration) config.getInterval()).toDuration().toMillis(); - int numBuckets = (int) Math.floor((endMilli - startMilli) / (double) bucketSize); - // adjust if numStrides is more than the max samples - int numIntervals = Math.min(numBuckets, numberOfSamples); - List> sampleRanges = Stream - .iterate(endMilli, i -> i - bucketSize) - .limit(numIntervals) - .map(time -> new SimpleImmutableEntry<>(time - bucketSize, time)) - .collect(Collectors.toList()); - - // Reverse the list to get time ranges in ascending order - Collections.reverse(sampleRanges); - - return sampleRanges; - } - // Method to apply imputation method based on the imputation option public static > T applyImputationMethod(Config config, T builder) { ImputationOption imputationOption = config.getImputationOption(); diff --git a/src/main/java/org/opensearch/timeseries/model/IntervalTimeConfiguration.java b/src/main/java/org/opensearch/timeseries/model/IntervalTimeConfiguration.java index 22c0fb416..589d11e4d 100644 --- a/src/main/java/org/opensearch/timeseries/model/IntervalTimeConfiguration.java +++ b/src/main/java/org/opensearch/timeseries/model/IntervalTimeConfiguration.java @@ -89,10 +89,12 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws @Generated @Override public boolean equals(Object o) { - if (this == o) + if (this == o) { return true; - if (o == null || getClass() != o.getClass()) + } + if (o == null || getClass() != o.getClass()) { return false; + } IntervalTimeConfiguration that = (IntervalTimeConfiguration) o; return getInterval() == that.getInterval() && getUnit() == that.getUnit(); } diff --git a/src/main/java/org/opensearch/timeseries/model/TimeConfiguration.java b/src/main/java/org/opensearch/timeseries/model/TimeConfiguration.java index d370e524e..83e760c4c 100644 --- a/src/main/java/org/opensearch/timeseries/model/TimeConfiguration.java +++ b/src/main/java/org/opensearch/timeseries/model/TimeConfiguration.java @@ -72,4 +72,5 @@ public static TimeConfiguration parse(XContentParser parser) throws IOException } throw new IllegalArgumentException("Find no schedule definition"); } + } diff --git a/src/main/java/org/opensearch/timeseries/rest/handler/AggregationPrep.java b/src/main/java/org/opensearch/timeseries/rest/handler/AggregationPrep.java new file mode 100644 index 000000000..534e4b3b9 --- /dev/null +++ b/src/main/java/org/opensearch/timeseries/rest/handler/AggregationPrep.java @@ -0,0 +1,222 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.timeseries.rest.handler; + +import java.time.ZonedDateTime; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Optional; +import java.util.stream.Collectors; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.opensearch.action.search.SearchRequest; +import org.opensearch.action.search.SearchResponse; +import org.opensearch.common.unit.TimeValue; +import org.opensearch.index.query.BoolQueryBuilder; +import org.opensearch.index.query.QueryBuilder; +import org.opensearch.index.query.QueryBuilders; +import org.opensearch.search.aggregations.AggregationBuilder; +import org.opensearch.search.aggregations.AggregationBuilders; +import org.opensearch.search.aggregations.Aggregations; +import org.opensearch.search.aggregations.bucket.histogram.DateHistogramInterval; +import org.opensearch.search.aggregations.bucket.histogram.Histogram; +import org.opensearch.search.aggregations.bucket.histogram.LongBounds; +import org.opensearch.search.builder.SearchSourceBuilder; +import org.opensearch.timeseries.common.exception.ValidationException; +import org.opensearch.timeseries.constant.CommonMessages; +import org.opensearch.timeseries.feature.SearchFeatureDao; +import org.opensearch.timeseries.model.Config; +import org.opensearch.timeseries.model.Entity; +import org.opensearch.timeseries.model.IntervalTimeConfiguration; +import org.opensearch.timeseries.model.ValidationAspect; +import org.opensearch.timeseries.model.ValidationIssueType; + +public class AggregationPrep { + protected static final Logger logger = LogManager.getLogger(AggregationPrep.class); + + private SearchFeatureDao dateRangeHelper; + private Config config; + TimeValue requestTimeout; + + public static final String AGGREGATION = "agg"; + + public AggregationPrep(SearchFeatureDao dateRangeHelper, TimeValue requestTimeout, Config config) { + this.dateRangeHelper = dateRangeHelper; + this.requestTimeout = requestTimeout; + this.config = config; + } + + public LongBounds getTimeRangeBounds(IntervalTimeConfiguration interval, long endMillis) { + long intervalInMillis = IntervalTimeConfiguration.getIntervalInMinute(interval) * 60000; + Long startMillis = endMillis - (getNumberOfSamples() * intervalInMillis); + return new LongBounds(startMillis, endMillis); + } + + public int getNumberOfSamples() { + return config.getHistoryIntervals(); + } + + public double getBucketHitRate(SearchResponse response, IntervalTimeConfiguration currentInterval, long endMillis) { + // as feature query might contain filter, use feature query as we do in cold start + if (config.getEnabledFeatureIds() != null && config.getEnabledFeatureIds().size() > 0) { + List> features = dateRangeHelper.parseColdStartSampleResp(response, false, config); + return features.stream().filter(Optional::isPresent).count() / getNumberOfSamples(); + } else { + return getHistorgramBucketHitRate(response); + } + } + + public double getHistorgramBucketHitRate(SearchResponse response) { + Histogram histogram = validateAndRetrieveHistogramAggregation(response); + if (histogram == null || histogram.getBuckets() == null) { + logger.warn("Empty histogram buckets"); + return 0; + } + // getBuckets returns non-empty bucket (e.g., doc_count > 0) + int bucketCount = histogram.getBuckets().size(); + + return bucketCount / getNumberOfSamples(); + } + + public List getTimestamps(SearchResponse response) { + if (config.getEnabledFeatureIds() != null && config.getEnabledFeatureIds().size() > 0) { + return dateRangeHelper.parseColdStartSampleTimestamp(response, false, config); + } else { + Histogram aggregate = validateAndRetrieveHistogramAggregation(response); + // In all cases, when the specified end time does not exist, the actual end time is the closest available time after the + // specified end. + // so we only have non-empty buckets + // in the original order, buckets are sorted in the ascending order of timestamps. + // Since the stream processing preserves the order of elements, we don't need to sort timestamps again. + return aggregate + .getBuckets() + .stream() + .map(entry -> AggregationPrep.convertKeyToEpochMillis(entry.getKey())) + .collect(Collectors.toList()); + } + } + + public SearchRequest createSearchRequest( + IntervalTimeConfiguration currentInterval, + LongBounds currentTimeStampBounds, + Map topEntity + ) { + if (config.getEnabledFeatureIds() != null && config.getEnabledFeatureIds().size() > 0) { + List> ranges = dateRangeHelper + .getTrainSampleRanges( + currentInterval, + currentTimeStampBounds.getMin(), + currentTimeStampBounds.getMax(), + getNumberOfSamples() + ); + return dateRangeHelper + .createColdStartFeatureSearchRequest( + config, + ranges, + topEntity.size() == 0 ? Optional.empty() : Optional.of(Entity.createEntityByReordering(topEntity)) + ); + } else { + return composeHistogramQuery( + topEntity, + (int) IntervalTimeConfiguration.getIntervalInMinute(currentInterval), + currentTimeStampBounds + ); + } + } + + public SearchRequest createSearchRequestForFeature( + IntervalTimeConfiguration currentInterval, + LongBounds currentTimeStampBounds, + Map topEntity, + int featureIndex + ) { + if (config.getEnabledFeatureIds() != null && config.getEnabledFeatureIds().size() > 0) { + List> ranges = dateRangeHelper + .getTrainSampleRanges( + currentInterval, + currentTimeStampBounds.getMin(), + currentTimeStampBounds.getMax(), + getNumberOfSamples() + ); + return dateRangeHelper + .createColdStartFeatureSearchRequestForSingleFeature( + config, + ranges, + topEntity.size() == 0 ? Optional.empty() : Optional.of(Entity.createEntityByReordering(topEntity)), + featureIndex + ); + } else { + throw new IllegalArgumentException("empty feature"); + } + } + + public static long convertKeyToEpochMillis(Object key) { + return key instanceof ZonedDateTime ? ((ZonedDateTime) key).toInstant().toEpochMilli() + : key instanceof Double ? ((Double) key).longValue() + : key instanceof Long ? (Long) key + : -1L; + } + + public SearchRequest composeHistogramQuery(Map topEntity, int intervalInMinutes, LongBounds timeStampBounds) { + AggregationBuilder aggregation = getHistogramAggregation(intervalInMinutes, timeStampBounds); + BoolQueryBuilder query = QueryBuilders.boolQuery().filter(config.getFilterQuery()); + if (config.isHighCardinality()) { + if (topEntity.isEmpty()) { + throw new ValidationException( + CommonMessages.CATEGORY_FIELD_TOO_SPARSE, + ValidationIssueType.CATEGORY, + ValidationAspect.MODEL + ); + } + for (Map.Entry entry : topEntity.entrySet()) { + query.filter(QueryBuilders.termQuery(entry.getKey(), entry.getValue())); + } + } + + SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder() + .query(query) + .aggregation(aggregation) + .size(0) + .timeout(requestTimeout); + return new SearchRequest(config.getIndices().toArray(new String[0])).source(searchSourceBuilder); + } + + public Histogram validateAndRetrieveHistogramAggregation(SearchResponse response) { + Aggregations aggs = response.getAggregations(); + if (aggs == null) { + // This would indicate some bug or some opensearch core changes that we are not aware of (we don't keep up-to-date with + // the large amounts of changes there). For this reason I'm not throwing a SearchException but instead a validation exception + // which will be converted to validation response. + logger.warn("Unexpected null aggregation."); + throw new ValidationException( + CommonMessages.MODEL_VALIDATION_FAILED_UNEXPECTEDLY, + ValidationIssueType.AGGREGATION, + ValidationAspect.MODEL + ); + } + Histogram aggregate = aggs.get(AGGREGATION); + if (aggregate == null) { + throw new IllegalArgumentException("Failed to find valid aggregation result"); + } + return aggregate; + } + + public AggregationBuilder getHistogramAggregation(int intervalInMinutes, LongBounds timeStampBound) { + return AggregationBuilders + .dateHistogram(AggregationPrep.AGGREGATION) + .field(config.getTimeField()) + .minDocCount(1) + .hardBounds(timeStampBound) + .fixedInterval(DateHistogramInterval.minutes(intervalInMinutes)); + } + + public SearchSourceBuilder getSearchSourceBuilder(QueryBuilder query, AggregationBuilder aggregation) { + return new SearchSourceBuilder().query(query).aggregation(aggregation).size(0).timeout(requestTimeout); + } + +} diff --git a/src/main/java/org/opensearch/timeseries/rest/handler/HistogramAggregationHelper.java b/src/main/java/org/opensearch/timeseries/rest/handler/HistogramAggregationHelper.java deleted file mode 100644 index eeaff0dc1..000000000 --- a/src/main/java/org/opensearch/timeseries/rest/handler/HistogramAggregationHelper.java +++ /dev/null @@ -1,127 +0,0 @@ -/* - * Copyright OpenSearch Contributors - * SPDX-License-Identifier: Apache-2.0 - */ - -package org.opensearch.timeseries.rest.handler; - -import java.time.Duration; -import java.time.ZonedDateTime; -import java.util.List; -import java.util.Optional; - -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; -import org.opensearch.action.search.SearchResponse; -import org.opensearch.common.unit.TimeValue; -import org.opensearch.index.query.QueryBuilder; -import org.opensearch.search.aggregations.AggregationBuilder; -import org.opensearch.search.aggregations.AggregationBuilders; -import org.opensearch.search.aggregations.Aggregations; -import org.opensearch.search.aggregations.bucket.histogram.DateHistogramInterval; -import org.opensearch.search.aggregations.bucket.histogram.Histogram; -import org.opensearch.search.aggregations.bucket.histogram.Histogram.Bucket; -import org.opensearch.search.aggregations.bucket.histogram.LongBounds; -import org.opensearch.search.builder.SearchSourceBuilder; -import org.opensearch.timeseries.common.exception.ValidationException; -import org.opensearch.timeseries.constant.CommonMessages; -import org.opensearch.timeseries.model.Config; -import org.opensearch.timeseries.model.IntervalTimeConfiguration; -import org.opensearch.timeseries.model.TimeConfiguration; -import org.opensearch.timeseries.model.ValidationAspect; -import org.opensearch.timeseries.model.ValidationIssueType; -import org.opensearch.timeseries.settings.TimeSeriesSettings; - -/** - * the class provides helper methods specifically for histogram aggregations - * - */ -public class HistogramAggregationHelper { - protected static final Logger logger = LogManager.getLogger(HistogramAggregationHelper.class); - - protected static final String AGGREGATION = "agg"; - - private Config config; - private final TimeValue requestTimeout; - - public HistogramAggregationHelper(Config config, TimeValue requestTimeout) { - this.config = config; - this.requestTimeout = requestTimeout; - } - - public Histogram checkBucketResultErrors(SearchResponse response) { - Aggregations aggs = response.getAggregations(); - if (aggs == null) { - // This would indicate some bug or some opensearch core changes that we are not aware of (we don't keep up-to-date with - // the large amounts of changes there). For this reason I'm not throwing a SearchException but instead a validation exception - // which will be converted to validation response. - logger.warn("Unexpected null aggregation."); - throw new ValidationException( - CommonMessages.MODEL_VALIDATION_FAILED_UNEXPECTEDLY, - ValidationIssueType.AGGREGATION, - ValidationAspect.MODEL - ); - } - Histogram aggregate = aggs.get(AGGREGATION); - if (aggregate == null) { - throw new IllegalArgumentException("Failed to find valid aggregation result"); - } - return aggregate; - } - - public AggregationBuilder getBucketAggregation(int intervalInMinutes, LongBounds timeStampBound) { - return AggregationBuilders - .dateHistogram(AGGREGATION) - .field(config.getTimeField()) - .minDocCount(1) - .hardBounds(timeStampBound) - .fixedInterval(DateHistogramInterval.minutes(intervalInMinutes)); - } - - public Long timeConfigToMilliSec(TimeConfiguration timeConfig) { - return Optional.ofNullable((IntervalTimeConfiguration) timeConfig).map(t -> t.toDuration().toMillis()).orElse(0L); - } - - public LongBounds getTimeRangeBounds(long endMillis, long intervalInMillis) { - Long startMillis = endMillis - (getNumberOfSamples(intervalInMillis) * intervalInMillis); - return new LongBounds(startMillis, endMillis); - } - - public int getNumberOfSamples(long intervalInMillis) { - return Math - .max( - (int) (Duration.ofHours(TimeSeriesSettings.TRAIN_SAMPLE_TIME_RANGE_IN_HOURS).toMillis() / intervalInMillis), - TimeSeriesSettings.MIN_TRAIN_SAMPLES - ); - } - - /** - * @param histogram buckets returned via Date historgram aggregation - * @param intervalInMillis suggested interval to use - * @return the number of buckets having data - */ - public double processBucketAggregationResults(Histogram histogram, long intervalInMillis, Config config) { - // In all cases, when the specified end time does not exist, the actual end time is the closest available time after the specified - // end. - // so we only have non-empty buckets - List bucketsInResponse = histogram.getBuckets(); - if (bucketsInResponse.size() >= config.getShingleSize() + TimeSeriesSettings.NUM_MIN_SAMPLES) { - long minTimestampMillis = convertKeyToEpochMillis(bucketsInResponse.get(0).getKey()); - long maxTimestampMillis = convertKeyToEpochMillis(bucketsInResponse.get(bucketsInResponse.size() - 1).getKey()); - double totalBuckets = (maxTimestampMillis - minTimestampMillis) / intervalInMillis; - return histogram.getBuckets().size() / totalBuckets; - } - return 0; - } - - public SearchSourceBuilder getSearchSourceBuilder(QueryBuilder query, AggregationBuilder aggregation) { - return new SearchSourceBuilder().query(query).aggregation(aggregation).size(0).timeout(requestTimeout); - } - - public static long convertKeyToEpochMillis(Object key) { - return key instanceof ZonedDateTime ? ((ZonedDateTime) key).toInstant().toEpochMilli() - : key instanceof Double ? ((Double) key).longValue() - : key instanceof Long ? (Long) key - : -1L; - } -} diff --git a/src/main/java/org/opensearch/timeseries/rest/handler/IntervalCalculation.java b/src/main/java/org/opensearch/timeseries/rest/handler/IntervalCalculation.java index e3c8a403e..20c090d4b 100644 --- a/src/main/java/org/opensearch/timeseries/rest/handler/IntervalCalculation.java +++ b/src/main/java/org/opensearch/timeseries/rest/handler/IntervalCalculation.java @@ -12,9 +12,7 @@ import java.util.Collections; import java.util.List; import java.util.Map; -import java.util.stream.Collectors; -import org.apache.commons.lang3.tuple.Pair; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.opensearch.action.search.SearchRequest; @@ -23,15 +21,12 @@ import org.opensearch.common.unit.TimeValue; import org.opensearch.commons.authuser.User; import org.opensearch.core.action.ActionListener; -import org.opensearch.index.query.BoolQueryBuilder; -import org.opensearch.index.query.QueryBuilders; -import org.opensearch.search.aggregations.AggregationBuilder; -import org.opensearch.search.aggregations.bucket.histogram.Histogram; import org.opensearch.search.aggregations.bucket.histogram.LongBounds; import org.opensearch.search.builder.SearchSourceBuilder; import org.opensearch.timeseries.AnalysisType; import org.opensearch.timeseries.common.exception.ValidationException; import org.opensearch.timeseries.constant.CommonMessages; +import org.opensearch.timeseries.feature.SearchFeatureDao; import org.opensearch.timeseries.model.Config; import org.opensearch.timeseries.model.IntervalTimeConfiguration; import org.opensearch.timeseries.model.ValidationAspect; @@ -42,15 +37,14 @@ public class IntervalCalculation { private final Logger logger = LogManager.getLogger(IntervalCalculation.class); - private final Config config; - private final TimeValue requestTimeout; - private final HistogramAggregationHelper histogramAggHelper; + private final AggregationPrep aggregationPrep; private final Client client; private final SecurityClientUtil clientUtil; private final User user; private final AnalysisType context; private final Clock clock; - private final FullBucketRatePredicate acceptanceCriteria; + private final Map topEntity; + private final long endMillis; public IntervalCalculation( Config config, @@ -59,49 +53,43 @@ public IntervalCalculation( SecurityClientUtil clientUtil, User user, AnalysisType context, - Clock clock + Clock clock, + SearchFeatureDao searchFeatureDao, + long latestTime, + Map topEntity ) { - this.config = config; - this.requestTimeout = requestTimeout; - this.histogramAggHelper = new HistogramAggregationHelper(config, requestTimeout); + this.aggregationPrep = new AggregationPrep(searchFeatureDao, requestTimeout, config); this.client = client; this.clientUtil = clientUtil; this.user = user; this.context = context; this.clock = clock; - this.acceptanceCriteria = new FullBucketRatePredicate(); + this.topEntity = topEntity; + this.endMillis = latestTime; } - public void findInterval(long latestTime, Map topEntity, ActionListener listener) { - ActionListener> minimumIntervalListener = ActionListener.wrap(minIntervalAndValidity -> { - if (minIntervalAndValidity.getRight()) { - // the minimum interval is also the interval passing acceptance criteria and we can return immediately - listener.onResponse(minIntervalAndValidity.getLeft()); - } else if (minIntervalAndValidity.getLeft() == null) { + public void findInterval(ActionListener listener) { + ActionListener minimumIntervalListener = ActionListener.wrap(minInterval -> { + if (minInterval == null) { // the minimum interval is too large listener.onResponse(null); } else { - // starting exploring larger interval - getBucketAggregates(latestTime, topEntity, minIntervalAndValidity.getLeft(), listener); + // starting exploring whether minimum or larger interval satisfy density requirement + getBucketAggregates(minInterval, listener); } }, listener::onFailure); // we use 1 minute = 60000 milliseconds to find minimum interval - LongBounds longBounds = histogramAggHelper.getTimeRangeBounds(latestTime, 60000); - findMinimumInterval(topEntity, longBounds, minimumIntervalListener); + LongBounds longBounds = aggregationPrep.getTimeRangeBounds(new IntervalTimeConfiguration(1, ChronoUnit.MINUTES), endMillis); + findMinimumInterval(longBounds, minimumIntervalListener); } - private void getBucketAggregates( - long latestTime, - Map topEntity, - IntervalTimeConfiguration minimumInterval, - ActionListener listener - ) throws IOException { + private void getBucketAggregates(IntervalTimeConfiguration minimumInterval, ActionListener listener) + throws IOException { try { - int newIntervalInMinutes = increaseAndGetNewInterval(minimumInterval); - LongBounds timeStampBounds = histogramAggHelper.getTimeRangeBounds(latestTime, newIntervalInMinutes); - SearchRequest searchRequest = composeIntervalQuery(topEntity, newIntervalInMinutes, timeStampBounds); + LongBounds timeStampBounds = aggregationPrep.getTimeRangeBounds(minimumInterval, endMillis); + SearchRequest searchRequest = aggregationPrep.createSearchRequest(minimumInterval, timeStampBounds, topEntity); ActionListener intervalListener = ActionListener .wrap(interval -> listener.onResponse(interval), exception -> { listener.onFailure(exception); @@ -110,9 +98,8 @@ private void getBucketAggregates( final ActionListener searchResponseListener = new IntervalRecommendationListener( intervalListener, searchRequest.source(), - (IntervalTimeConfiguration) config.getInterval(), + minimumInterval, clock.millis() + TimeSeriesSettings.TOP_VALIDATE_TIMEOUT_IN_MILLIS, - latestTime, timeStampBounds ); // using the original context in listener as user roles have no permissions for internal operations like fetching a @@ -151,10 +138,8 @@ private int increaseAndGetNewInterval(IntervalTimeConfiguration oldInterval) { */ class IntervalRecommendationListener implements ActionListener { private final ActionListener intervalListener; - SearchSourceBuilder searchSourceBuilder; IntervalTimeConfiguration currentIntervalToTry; private final long expirationEpochMs; - private final long latestTime; private LongBounds currentTimeStampBounds; IntervalRecommendationListener( @@ -162,34 +147,19 @@ class IntervalRecommendationListener implements ActionListener { SearchSourceBuilder searchSourceBuilder, IntervalTimeConfiguration currentIntervalToTry, long expirationEpochMs, - long latestTime, LongBounds timeStampBounds ) { this.intervalListener = intervalListener; - this.searchSourceBuilder = searchSourceBuilder; this.currentIntervalToTry = currentIntervalToTry; this.expirationEpochMs = expirationEpochMs; - this.latestTime = latestTime; this.currentTimeStampBounds = timeStampBounds; } @Override public void onResponse(SearchResponse response) { try { - Histogram aggregate = null; - try { - aggregate = histogramAggHelper.checkBucketResultErrors(response); - } catch (ValidationException e) { - intervalListener.onFailure(e); - } - - if (aggregate == null) { - intervalListener.onResponse(null); - return; - } - int newIntervalMinute = increaseAndGetNewInterval(currentIntervalToTry); - double fullBucketRate = histogramAggHelper.processBucketAggregationResults(aggregate, newIntervalMinute * 60000, config); + double fullBucketRate = aggregationPrep.getBucketHitRate(response, currentIntervalToTry, endMillis); // If rate is above success minimum then return interval suggestion. if (fullBucketRate > TimeSeriesSettings.INTERVAL_BUCKET_MINIMUM_SUCCESS_RATE) { intervalListener.onResponse(this.currentIntervalToTry); @@ -221,18 +191,12 @@ public void onResponse(SearchResponse response) { private void searchWithDifferentInterval(int newIntervalMinuteValue) { this.currentIntervalToTry = new IntervalTimeConfiguration(newIntervalMinuteValue, ChronoUnit.MINUTES); - this.currentTimeStampBounds = histogramAggHelper.getTimeRangeBounds(latestTime, newIntervalMinuteValue); - // Searching again using an updated interval - SearchSourceBuilder updatedSearchSourceBuilder = histogramAggHelper - .getSearchSourceBuilder( - searchSourceBuilder.query(), - histogramAggHelper.getBucketAggregation(newIntervalMinuteValue, currentTimeStampBounds) - ); + this.currentTimeStampBounds = aggregationPrep.getTimeRangeBounds(currentIntervalToTry, endMillis); // using the original context in listener as user roles have no permissions for internal operations like fetching a // checkpoint clientUtil .asyncRequestWithInjectedSecurity( - new SearchRequest().indices(config.getIndices().toArray(new String[0])).source(updatedSearchSourceBuilder), + aggregationPrep.createSearchRequest(currentIntervalToTry, currentTimeStampBounds, topEntity), client::search, user, client, @@ -281,43 +245,17 @@ public void onFailure(Exception e) { * * @param topEntity top entity to use * @param timeStampBounds Used to determine start and end date range to search for data - * @param listener returns minimum interval and whether the interval passes data density test + * @param listener returns minimum interval */ - private void findMinimumInterval( - Map topEntity, - LongBounds timeStampBounds, - ActionListener> listener - ) { + private void findMinimumInterval(LongBounds timeStampBounds, ActionListener listener) { try { - SearchRequest searchRequest = composeIntervalQuery(topEntity, 1, timeStampBounds); + SearchRequest searchRequest = aggregationPrep + .createSearchRequest(new IntervalTimeConfiguration(1, ChronoUnit.MINUTES), timeStampBounds, topEntity); final ActionListener searchResponseListener = ActionListener.wrap(response -> { - Histogram aggregate = null; - try { - aggregate = histogramAggHelper.checkBucketResultErrors(response); - } catch (ValidationException e) { - listener.onFailure(e); - } - - if (aggregate == null) { - // fail to find the minimum interval. Return one minute. - logger.warn("Fail to get aggregated result"); - listener.onResponse(Pair.of(new IntervalTimeConfiguration(1, ChronoUnit.MINUTES), Boolean.FALSE)); - return; - } - // In all cases, when the specified end time does not exist, the actual end time is the closest available time after the - // specified end. - // so we only have non-empty buckets - // in the original order, buckets are sorted in the ascending order of timestamps. - // Since the stream processing preserves the order of elements, we don't need to sort timestamps again. - List timestamps = aggregate - .getBuckets() - .stream() - .map(entry -> HistogramAggregationHelper.convertKeyToEpochMillis(entry.getKey())) - .collect(Collectors.toList()); - + List timestamps = aggregationPrep.getTimestamps(response); if (timestamps.isEmpty()) { logger.warn("empty data, return one minute by default"); - listener.onResponse(Pair.of(new IntervalTimeConfiguration(1, ChronoUnit.MINUTES), Boolean.FALSE)); + listener.onResponse(new IntervalTimeConfiguration(1, ChronoUnit.MINUTES)); return; } @@ -325,17 +263,10 @@ private void findMinimumInterval( long minimumMinutes = millisecondsToCeilMinutes(((Double) medianDifference).longValue()); if (minimumMinutes > TimeSeriesSettings.MAX_INTERVAL_REC_LENGTH_IN_MINUTES) { logger.warn("The minimum interval is too large: {}", minimumMinutes); - listener.onResponse(Pair.of(null, false)); + listener.onResponse(null); return; } - listener - .onResponse( - Pair - .of( - new IntervalTimeConfiguration(minimumMinutes, ChronoUnit.MINUTES), - acceptanceCriteria.test(aggregate, minimumMinutes) - ) - ); + listener.onResponse(new IntervalTimeConfiguration(minimumMinutes, ChronoUnit.MINUTES)); }, listener::onFailure); // using the original context in listener as user roles have no permissions for internal operations like fetching a // checkpoint @@ -389,43 +320,4 @@ private static long millisecondsToCeilMinutes(long milliseconds) { // whole minute is rounded up to the next minute. return (milliseconds + 59999) / 60000; } - - private SearchRequest composeIntervalQuery(Map topEntity, int intervalInMinutes, LongBounds timeStampBounds) { - AggregationBuilder aggregation = histogramAggHelper.getBucketAggregation(intervalInMinutes, timeStampBounds); - BoolQueryBuilder query = QueryBuilders.boolQuery().filter(config.getFilterQuery()); - if (config.isHighCardinality()) { - if (topEntity.isEmpty()) { - throw new ValidationException( - CommonMessages.CATEGORY_FIELD_TOO_SPARSE, - ValidationIssueType.CATEGORY, - ValidationAspect.MODEL - ); - } - for (Map.Entry entry : topEntity.entrySet()) { - query.filter(QueryBuilders.termQuery(entry.getKey(), entry.getValue())); - } - } - - SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder() - .query(query) - .aggregation(aggregation) - .size(0) - .timeout(requestTimeout); - return new SearchRequest(config.getIndices().toArray(new String[0])).source(searchSourceBuilder); - } - - interface HistogramPredicate { - boolean test(Histogram histogram, long minimumMinutes); - } - - class FullBucketRatePredicate implements HistogramPredicate { - - @Override - public boolean test(Histogram histogram, long minimumMinutes) { - double fullBucketRate = histogramAggHelper.processBucketAggregationResults(histogram, minimumMinutes * 60000, config); - // If rate is above success minimum then return true. - return fullBucketRate > TimeSeriesSettings.INTERVAL_BUCKET_MINIMUM_SUCCESS_RATE; - } - - } } diff --git a/src/main/java/org/opensearch/timeseries/rest/handler/LatestTimeRetriever.java b/src/main/java/org/opensearch/timeseries/rest/handler/LatestTimeRetriever.java index 5d0393842..1086d6f76 100644 --- a/src/main/java/org/opensearch/timeseries/rest/handler/LatestTimeRetriever.java +++ b/src/main/java/org/opensearch/timeseries/rest/handler/LatestTimeRetriever.java @@ -5,7 +5,7 @@ package org.opensearch.timeseries.rest.handler; -import java.time.Instant; +import java.time.temporal.ChronoUnit; import java.util.Collections; import java.util.Comparator; import java.util.HashMap; @@ -39,7 +39,7 @@ import org.opensearch.timeseries.AnalysisType; import org.opensearch.timeseries.feature.SearchFeatureDao; import org.opensearch.timeseries.model.Config; -import org.opensearch.timeseries.model.Entity; +import org.opensearch.timeseries.model.IntervalTimeConfiguration; import org.opensearch.timeseries.settings.TimeSeriesSettings; import org.opensearch.timeseries.util.SecurityClientUtil; @@ -49,8 +49,7 @@ public class LatestTimeRetriever { protected static final String AGG_NAME_TOP = "top_agg"; private final Config config; - // private final ActionListener> listener; - private final HistogramAggregationHelper histogramAggHelper; + private final AggregationPrep aggregationPrep; private final SecurityClientUtil clientUtil; private final Client client; private final User user; @@ -67,7 +66,7 @@ public LatestTimeRetriever( SearchFeatureDao searchFeatureDao ) { this.config = config; - this.histogramAggHelper = new HistogramAggregationHelper(config, requestTimeout); + this.aggregationPrep = new AggregationPrep(searchFeatureDao, requestTimeout, config); this.clientUtil = clientUtil; this.client = client; this.user = user; @@ -76,41 +75,32 @@ public LatestTimeRetriever( } /** - * Need to first check if HC analysis or not before retrieving latest date time. + * Need to first retrieve latest date time before checking if HC analysis or not. * If the config is HC then we will find the top entity and treat as single stream for * validation purposes * @param listener to return latest time and entity attributes if the config is HC */ public void checkIfHC(ActionListener, Map>> listener) { - ActionListener> topEntityListener = ActionListener - .wrap( - topEntity -> searchFeatureDao - .getLatestDataTime( - config, - Optional.of(Entity.createEntityByReordering(topEntity)), - context, - ActionListener.wrap(latestTime -> listener.onResponse(Pair.of(latestTime, topEntity)), listener::onFailure) - ), - exception -> { - listener.onFailure(exception); - logger.error("Failed to get top entity for categorical field", exception); - } - ); - if (config.isHighCardinality()) { - getTopEntity(topEntityListener); - } else { - topEntityListener.onResponse(Collections.emptyMap()); - } + searchFeatureDao.getLatestDataTime(config, Optional.empty(), context, ActionListener.wrap(latestTime -> { + if (latestTime.isEmpty()) { + listener.onResponse(Pair.of(Optional.empty(), Collections.emptyMap())); + } else if (config.isHighCardinality()) { + getTopEntity(listener, latestTime.get()); + } else { + listener.onResponse(Pair.of(latestTime, Collections.emptyMap())); + } + }, listener::onFailure)); } // For single category HCs, this method uses bucket aggregation and sort to get the category field // that have the highest document count in order to use that top entity for further validation // For multi-category HCs we use a composite aggregation to find the top fields for the entity // with the highest doc count. - public void getTopEntity(ActionListener> topEntityListener) { + public void getTopEntity(ActionListener, Map>> topEntityListener, long latestTimeMillis) { // Look at data back to the lower bound given the max interval we recommend or one given long maxIntervalInMinutes = Math.max(TimeSeriesSettings.MAX_INTERVAL_REC_LENGTH_IN_MINUTES, config.getIntervalInMinutes()); - LongBounds timeRangeBounds = histogramAggHelper.getTimeRangeBounds(Instant.now().toEpochMilli(), maxIntervalInMinutes * 60000); + LongBounds timeRangeBounds = aggregationPrep + .getTimeRangeBounds(new IntervalTimeConfiguration(maxIntervalInMinutes, ChronoUnit.MINUTES), latestTimeMillis); RangeQueryBuilder rangeQuery = new RangeQueryBuilder(config.getTimeField()) .from(timeRangeBounds.getMin()) .to(timeRangeBounds.getMax()); @@ -140,7 +130,8 @@ public void getTopEntity(ActionListener> topEntityListener) final ActionListener searchResponseListener = ActionListener.wrap(response -> { Aggregations aggs = response.getAggregations(); if (aggs == null) { - topEntityListener.onResponse(Collections.emptyMap()); + logger.warn("empty aggregation"); + topEntityListener.onResponse(Pair.of(Optional.empty(), Collections.emptyMap())); return; } if (config.getCategoryFields().size() == 1) { @@ -165,11 +156,11 @@ public void getTopEntity(ActionListener> topEntityListener) } for (Map.Entry entry : topKeys.entrySet()) { if (entry.getValue() == null) { - topEntityListener.onResponse(Collections.emptyMap()); + topEntityListener.onResponse(Pair.of(Optional.empty(), Collections.emptyMap())); return; } } - topEntityListener.onResponse(topKeys); + topEntityListener.onResponse(Pair.of(Optional.of(latestTimeMillis), topKeys)); }, topEntityListener::onFailure); // using the original context in listener as user roles have no permissions for internal operations like fetching a // checkpoint diff --git a/src/main/java/org/opensearch/timeseries/rest/handler/ModelValidationActionHandler.java b/src/main/java/org/opensearch/timeseries/rest/handler/ModelValidationActionHandler.java index 7e49f3ead..092aa1a08 100644 --- a/src/main/java/org/opensearch/timeseries/rest/handler/ModelValidationActionHandler.java +++ b/src/main/java/org/opensearch/timeseries/rest/handler/ModelValidationActionHandler.java @@ -13,7 +13,7 @@ import java.time.temporal.ChronoUnit; import java.util.ArrayList; import java.util.Collections; -import java.util.List; +import java.util.HashMap; import java.util.Locale; import java.util.Map; import java.util.Optional; @@ -35,7 +35,6 @@ import org.opensearch.index.query.BoolQueryBuilder; import org.opensearch.index.query.QueryBuilders; import org.opensearch.search.aggregations.AggregationBuilder; -import org.opensearch.search.aggregations.bucket.histogram.Histogram; import org.opensearch.search.aggregations.bucket.histogram.LongBounds; import org.opensearch.search.builder.SearchSourceBuilder; import org.opensearch.timeseries.AnalysisType; @@ -46,12 +45,12 @@ import org.opensearch.timeseries.model.Feature; import org.opensearch.timeseries.model.IntervalTimeConfiguration; import org.opensearch.timeseries.model.MergeableList; +import org.opensearch.timeseries.model.TimeConfiguration; import org.opensearch.timeseries.model.ValidationAspect; import org.opensearch.timeseries.model.ValidationIssueType; import org.opensearch.timeseries.settings.TimeSeriesSettings; import org.opensearch.timeseries.transport.ValidateConfigResponse; import org.opensearch.timeseries.util.MultiResponsesDelegateActionListener; -import org.opensearch.timeseries.util.ParseUtils; import org.opensearch.timeseries.util.SecurityClientUtil; /** @@ -62,7 +61,6 @@ * different varying intervals in order to find the best interval for the data. If no interval is found with all * configuration applied then each configuration is tested sequentially for sparsity

*/ -// TODO: Add more UT and IT public class ModelValidationActionHandler { protected final Config config; @@ -78,11 +76,13 @@ public class ModelValidationActionHandler { protected final Settings settings; protected final User user; protected final AnalysisType context; - private final HistogramAggregationHelper histogramAggHelper; - private final IntervalCalculation intervalCalculation; + // private final HistogramAggregationHelper histogramAggHelper; + private final SearchFeatureDao searchFeatureDao; // time range bounds to verify configured interval makes sense or not private LongBounds timeRangeToSearchForConfiguredInterval; private final LatestTimeRetriever latestTimeRetriever; + private final ValidationIssueType intervalIssueType; + private AggregationPrep aggregationPrep; /** * Constructor function. @@ -100,6 +100,7 @@ public class ModelValidationActionHandler { * @param settings Node settings * @param user User info * @param context Analysis type + * @param intervalIssueType Interval validation issue type */ public ModelValidationActionHandler( ClusterService clusterService, @@ -114,7 +115,8 @@ public ModelValidationActionHandler( Clock clock, Settings settings, User user, - AnalysisType context + AnalysisType context, + ValidationIssueType intervalIssueType ) { this.clusterService = clusterService; this.client = client; @@ -123,16 +125,17 @@ public ModelValidationActionHandler( this.config = config; this.requestTimeout = requestTimeout; this.xContentRegistry = xContentRegistry; + this.searchFeatureDao = searchFeatureDao; this.validationType = validationType; this.clock = clock; this.settings = settings; this.user = user; this.context = context; - this.histogramAggHelper = new HistogramAggregationHelper(config, requestTimeout); - this.intervalCalculation = new IntervalCalculation(config, requestTimeout, client, clientUtil, user, context, clock); // calculate the bounds in a lazy manner this.timeRangeToSearchForConfiguredInterval = null; this.latestTimeRetriever = new LatestTimeRetriever(config, requestTimeout, clientUtil, client, user, context, searchFeatureDao); + this.intervalIssueType = intervalIssueType; + this.aggregationPrep = new AggregationPrep(searchFeatureDao, requestTimeout, config); } public void start() { @@ -170,15 +173,14 @@ private void getSampleRangesForValidationChecks( return; } long timeRangeEnd = Math.min(Instant.now().toEpochMilli(), latestTime.get()); - intervalCalculation + new IntervalCalculation(config, requestTimeout, client, clientUtil, user, context, clock, searchFeatureDao, timeRangeEnd, topEntity) .findInterval( - timeRangeEnd, - topEntity, - ActionListener.wrap(interval -> processIntervalRecommendation(interval, latestTime.get()), listener::onFailure) + ActionListener.wrap(interval -> processIntervalRecommendation(interval, latestTime.get(), topEntity), listener::onFailure) ); } - private void processIntervalRecommendation(IntervalTimeConfiguration interval, long latestTime) { + private void processIntervalRecommendation(IntervalTimeConfiguration interval, long latestTime, Map topEntity) + throws IOException { // if interval suggestion is null that means no interval could be found with all the configurations // applied, our next step then is to check density just with the raw data and then add each configuration // one at a time to try and find root cause of low density @@ -186,9 +188,9 @@ private void processIntervalRecommendation(IntervalTimeConfiguration interval, l checkRawDataSparsity(latestTime); } else { if (((IntervalTimeConfiguration) config.getInterval()).gte(interval)) { - logger.info("Using the current interval there is enough dense data "); + logger.info("Using the current interval as there is enough dense data "); // Check if there is a window delay recommendation if everything else is successful and send exception - if (Instant.now().toEpochMilli() - latestTime > histogramAggHelper.timeConfigToMilliSec(config.getWindowDelay())) { + if (Instant.now().toEpochMilli() - latestTime > timeConfigToMilliSec(config.getWindowDelay())) { sendWindowDelayRec(latestTime); return; } @@ -201,7 +203,7 @@ private void processIntervalRecommendation(IntervalTimeConfiguration interval, l .onFailure( new ValidationException( CommonMessages.INTERVAL_REC + interval.getInterval(), - ValidationIssueType.DETECTION_INTERVAL, + intervalIssueType, ValidationAspect.MODEL, interval ) @@ -209,14 +211,14 @@ private void processIntervalRecommendation(IntervalTimeConfiguration interval, l } } - public AggregationBuilder getBucketAggregation(long latestTime) { + private AggregationBuilder getBucketAggregation(long latestTime) { IntervalTimeConfiguration interval = (IntervalTimeConfiguration) config.getInterval(); long intervalInMinutes = IntervalTimeConfiguration.getIntervalInMinute(interval); if (timeRangeToSearchForConfiguredInterval == null) { - timeRangeToSearchForConfiguredInterval = histogramAggHelper.getTimeRangeBounds(latestTime, intervalInMinutes * 60000); + timeRangeToSearchForConfiguredInterval = aggregationPrep.getTimeRangeBounds(interval, latestTime); } - return histogramAggHelper.getBucketAggregation((int) intervalInMinutes, timeRangeToSearchForConfiguredInterval); + return aggregationPrep.getHistogramAggregation((int) intervalInMinutes, timeRangeToSearchForConfiguredInterval); } private void checkRawDataSparsity(long latestTime) { @@ -238,24 +240,8 @@ private void checkRawDataSparsity(long latestTime) { ); } - public double processBucketAggregationResults(Histogram buckets, long latestTime) { - long intervalInMillis = config.getIntervalInMilliseconds(); - return histogramAggHelper.processBucketAggregationResults(buckets, intervalInMillis, config); - } - private void processRawDataResults(SearchResponse response, long latestTime) { - Histogram aggregate = null; - try { - aggregate = histogramAggHelper.checkBucketResultErrors(response); - } catch (ValidationException e) { - listener.onFailure(e); - } - - if (aggregate == null) { - return; - } - double fullBucketRate = processBucketAggregationResults(aggregate, latestTime); - if (fullBucketRate < TimeSeriesSettings.INTERVAL_BUCKET_MINIMUM_SUCCESS_RATE) { + if (aggregationPrep.getHistorgramBucketHitRate(response) < TimeSeriesSettings.INTERVAL_BUCKET_MINIMUM_SUCCESS_RATE) { listener .onFailure( new ValidationException(CommonMessages.RAW_DATA_TOO_SPARSE, ValidationIssueType.INDICES, ValidationAspect.MODEL) @@ -268,7 +254,7 @@ private void processRawDataResults(SearchResponse response, long latestTime) { private void checkDataFilterSparsity(long latestTime) { AggregationBuilder aggregation = getBucketAggregation(latestTime); BoolQueryBuilder query = QueryBuilders.boolQuery().filter(config.getFilterQuery()); - SearchSourceBuilder searchSourceBuilder = histogramAggHelper.getSearchSourceBuilder(query, aggregation); + SearchSourceBuilder searchSourceBuilder = aggregationPrep.getSearchSourceBuilder(query, aggregation); SearchRequest searchRequest = new SearchRequest(config.getIndices().toArray(new String[0])).source(searchSourceBuilder); final ActionListener searchResponseListener = ActionListener .wrap(response -> processDataFilterResults(response, latestTime), listener::onFailure); @@ -286,18 +272,7 @@ private void checkDataFilterSparsity(long latestTime) { } private void processDataFilterResults(SearchResponse response, long latestTime) { - Histogram aggregate = null; - try { - aggregate = histogramAggHelper.checkBucketResultErrors(response); - } catch (ValidationException e) { - listener.onFailure(e); - } - - if (aggregate == null) { - return; - } - double fullBucketRate = processBucketAggregationResults(aggregate, latestTime); - if (fullBucketRate < CONFIG_BUCKET_MINIMUM_SUCCESS_RATE) { + if (aggregationPrep.getHistorgramBucketHitRate(response) < CONFIG_BUCKET_MINIMUM_SUCCESS_RATE) { listener .onFailure( new ValidationException( @@ -313,7 +288,7 @@ private void processDataFilterResults(SearchResponse response, long latestTime) getTopEntityForCategoryField(latestTime); } else { try { - checkFeatureQueryDelegate(latestTime); + checkFeatureQueryDelegate(latestTime, new HashMap<>()); } catch (Exception ex) { logger.error(ex); listener.onFailure(ex); @@ -322,13 +297,13 @@ private void processDataFilterResults(SearchResponse response, long latestTime) } private void getTopEntityForCategoryField(long latestTime) { - ActionListener> getTopEntityListener = ActionListener - .wrap(topEntity -> checkCategoryFieldSparsity(topEntity, latestTime), exception -> { + ActionListener, Map>> getTopEntityListener = ActionListener + .wrap(topEntity -> checkCategoryFieldSparsity(topEntity.getRight(), latestTime), exception -> { listener.onFailure(exception); logger.error("Failed to get top entity for categorical field", exception); return; }); - latestTimeRetriever.getTopEntity(getTopEntityListener); + latestTimeRetriever.getTopEntity(getTopEntityListener, latestTime); } private void checkCategoryFieldSparsity(Map topEntity, long latestTime) { @@ -337,10 +312,10 @@ private void checkCategoryFieldSparsity(Map topEntity, long late query.filter(QueryBuilders.termQuery(entry.getKey(), entry.getValue())); } AggregationBuilder aggregation = getBucketAggregation(latestTime); - SearchSourceBuilder searchSourceBuilder = histogramAggHelper.getSearchSourceBuilder(query, aggregation); + SearchSourceBuilder searchSourceBuilder = aggregationPrep.getSearchSourceBuilder(query, aggregation); SearchRequest searchRequest = new SearchRequest(config.getIndices().toArray(new String[0])).source(searchSourceBuilder); final ActionListener searchResponseListener = ActionListener - .wrap(response -> processTopEntityResults(response, latestTime), listener::onFailure); + .wrap(response -> processTopEntityResults(response, latestTime, topEntity), listener::onFailure); // using the original context in listener as user roles have no permissions for internal operations like fetching a // checkpoint clientUtil @@ -354,26 +329,15 @@ private void checkCategoryFieldSparsity(Map topEntity, long late ); } - private void processTopEntityResults(SearchResponse response, long latestTime) { - Histogram aggregate = null; - try { - aggregate = histogramAggHelper.checkBucketResultErrors(response); - } catch (ValidationException e) { - listener.onFailure(e); - } - - if (aggregate == null) { - return; - } - double fullBucketRate = processBucketAggregationResults(aggregate, latestTime); - if (fullBucketRate < CONFIG_BUCKET_MINIMUM_SUCCESS_RATE) { + private void processTopEntityResults(SearchResponse response, long latestTime, Map topEntity) { + if (aggregationPrep.getHistorgramBucketHitRate(response) < CONFIG_BUCKET_MINIMUM_SUCCESS_RATE) { listener .onFailure( new ValidationException(CommonMessages.CATEGORY_FIELD_TOO_SPARSE, ValidationIssueType.CATEGORY, ValidationAspect.MODEL) ); } else { try { - checkFeatureQueryDelegate(latestTime); + checkFeatureQueryDelegate(latestTime, topEntity); } catch (Exception ex) { logger.error(ex); listener.onFailure(ex); @@ -381,7 +345,20 @@ private void processTopEntityResults(SearchResponse response, long latestTime) { } } - private void checkFeatureQueryDelegate(long latestTime) throws IOException { + private void checkFeatureQueryDelegate(long latestTime, Map topEntity) throws IOException { + if (config.isHighCardinality()) { + if (topEntity.isEmpty()) { + listener + .onFailure( + new ValidationException( + CommonMessages.CATEGORY_FIELD_TOO_SPARSE, + ValidationIssueType.CATEGORY, + ValidationAspect.MODEL + ) + ); + return; + } + } ActionListener> validateFeatureQueriesListener = ActionListener.wrap(response -> { windowDelayRecommendation(latestTime); }, exception -> { @@ -395,28 +372,24 @@ private void checkFeatureQueryDelegate(long latestTime) throws IOException { CommonMessages.FEATURE_QUERY_TOO_SPARSE, false ); - - for (Feature feature : config.getFeatureAttributes()) { - AggregationBuilder aggregation = getBucketAggregation(latestTime); - BoolQueryBuilder query = QueryBuilders.boolQuery().filter(config.getFilterQuery()); - List featureFields = ParseUtils.getFieldNamesForFeature(feature, xContentRegistry); - for (String featureField : featureFields) { - query.filter(QueryBuilders.existsQuery(featureField)); - } - SearchSourceBuilder searchSourceBuilder = histogramAggHelper.getSearchSourceBuilder(query, aggregation); - SearchRequest searchRequest = new SearchRequest(config.getIndices().toArray(new String[0])).source(searchSourceBuilder); + for (int i = 0; i < config.getFeatureAttributes().size(); i++) { + final Feature feature = config.getFeatureAttributes().get(i); + IntervalTimeConfiguration interval = (IntervalTimeConfiguration) config.getInterval(); + SearchRequest searchRequest = aggregationPrep + .createSearchRequestForFeature(interval, aggregationPrep.getTimeRangeBounds(interval, latestTime), topEntity, i); final ActionListener searchResponseListener = ActionListener.wrap(response -> { try { - Histogram aggregate = histogramAggHelper.checkBucketResultErrors(response); - if (aggregate == null) { - return; - } - double fullBucketRate = processBucketAggregationResults(aggregate, latestTime); + double fullBucketRate = aggregationPrep.getBucketHitRate(response, interval, latestTime); if (fullBucketRate < CONFIG_BUCKET_MINIMUM_SUCCESS_RATE) { multiFeatureQueriesResponseListener .onFailure( new ValidationException( - CommonMessages.FEATURE_QUERY_TOO_SPARSE, + // BaseValidateConfigTransportAction.getFeatureSubIssuesFromErrorMessage assumes feature issue has a + // colon and would reverse the message and erroneous feature name. + // Need to follow the same convention. This convention has been followed by other cases in + // AbstractTimeSeriesActionHandler.validateConfigFeatures + // like "Feature has an invalid query returning empty aggregated data: max1" + String.format(Locale.ROOT, "%s: %s", CommonMessages.FEATURE_QUERY_TOO_SPARSE, feature.getName()), ValidationIssueType.FEATURE_ATTRIBUTES, ValidationAspect.MODEL ) @@ -464,7 +437,7 @@ private void sendWindowDelayRec(long latestTimeInMillis) { private void windowDelayRecommendation(long latestTime) { // Check if there is a better window-delay to recommend and if one was recommended // then send exception and return, otherwise continue to let user know data is too sparse as explained below - if (Instant.now().toEpochMilli() - latestTime > histogramAggHelper.timeConfigToMilliSec(config.getWindowDelay())) { + if (Instant.now().toEpochMilli() - latestTime > timeConfigToMilliSec(config.getWindowDelay())) { sendWindowDelayRec(latestTime); return; } @@ -479,4 +452,7 @@ private void windowDelayRecommendation(long latestTime) { .onFailure(new ValidationException(CommonMessages.RAW_DATA_TOO_SPARSE, ValidationIssueType.INDICES, ValidationAspect.MODEL)); } + private Long timeConfigToMilliSec(TimeConfiguration timeConfig) { + return Optional.ofNullable((IntervalTimeConfiguration) timeConfig).map(t -> t.toDuration().toMillis()).orElse(0L); + } } diff --git a/src/main/java/org/opensearch/timeseries/transport/BaseSuggestConfigParamTransportAction.java b/src/main/java/org/opensearch/timeseries/transport/BaseSuggestConfigParamTransportAction.java index 3bc2decb1..77a35cb29 100644 --- a/src/main/java/org/opensearch/timeseries/transport/BaseSuggestConfigParamTransportAction.java +++ b/src/main/java/org/opensearch/timeseries/transport/BaseSuggestConfigParamTransportAction.java @@ -111,7 +111,6 @@ public void resolveUserAndExecute(User requestedUser, ActionListener listener) { - IntervalCalculation intervalCalculation = new IntervalCalculation(config, timeout, client, clientUtil, user, context, clock); LatestTimeRetriever latestTimeRetriever = new LatestTimeRetriever( config, timeout, @@ -130,7 +129,19 @@ protected void suggestInterval(Config config, User user, TimeValue timeout, Acti ActionListener, Map>> latestTimeListener = ActionListener.wrap(latestEntityAttributes -> { Optional latestTime = latestEntityAttributes.getLeft(); if (latestTime.isPresent()) { - intervalCalculation.findInterval(latestTime.get(), latestEntityAttributes.getRight(), intervalSuggestionListener); + IntervalCalculation intervalCalculation = new IntervalCalculation( + config, + timeout, + client, + clientUtil, + user, + context, + clock, + searchFeatureDao, + latestTime.get(), + latestEntityAttributes.getRight() + ); + intervalCalculation.findInterval(intervalSuggestionListener); } else { listener.onFailure(new TimeSeriesException("Empty data. Cannot find a good interval.")); } diff --git a/src/main/java/org/opensearch/timeseries/transport/BaseValidateConfigTransportAction.java b/src/main/java/org/opensearch/timeseries/transport/BaseValidateConfigTransportAction.java index aa1930f59..dca5d9ee1 100644 --- a/src/main/java/org/opensearch/timeseries/transport/BaseValidateConfigTransportAction.java +++ b/src/main/java/org/opensearch/timeseries/transport/BaseValidateConfigTransportAction.java @@ -18,7 +18,6 @@ import org.opensearch.action.search.SearchRequest; import org.opensearch.action.support.ActionFilters; import org.opensearch.action.support.HandledTransportAction; -import org.opensearch.ad.constant.ADCommonMessages; import org.opensearch.client.Client; import org.opensearch.cluster.service.ClusterService; import org.opensearch.common.settings.Setting; @@ -33,6 +32,7 @@ import org.opensearch.tasks.Task; import org.opensearch.timeseries.common.exception.TimeSeriesException; import org.opensearch.timeseries.common.exception.ValidationException; +import org.opensearch.timeseries.constant.CommonMessages; import org.opensearch.timeseries.feature.SearchFeatureDao; import org.opensearch.timeseries.function.ExecutorFunction; import org.opensearch.timeseries.indices.IndexManagement; @@ -60,6 +60,7 @@ public abstract class BaseValidateConfigTransportAction filterByBackendRoleSetting + Setting filterByBackendRoleSetting, + ValidationAspect validationAspect ) { super(actionName, transportService, actionFilters, ValidateConfigRequest::new); this.client = client; @@ -85,6 +87,7 @@ public BaseValidateConfigTransportAction( this.searchFeatureDao = searchFeatureDao; this.clock = Clock.systemUTC(); this.settings = settings; + this.validationAspect = validationAspect; } @Override @@ -101,7 +104,7 @@ protected void doExecute(Task task, ValidateConfigRequest request, ActionListene public void resolveUserAndExecute(User requestedUser, ActionListener listener, ExecutorFunction function) { try { // Check if user has backend roles - // When filter by is enabled, block users validating detectors who do not have backend roles. + // When filter by is enabled, block users validating configs who do not have backend roles. if (filterByEnabled) { String error = checkFilterByBackendRoles(requestedUser); if (error != null) { @@ -127,10 +130,10 @@ protected void checkIndicesAndExecute( client.search(searchRequest, ActionListener.wrap(r -> function.execute(), e -> { if (e instanceof IndexNotFoundException) { // IndexNotFoundException is converted to a ADValidationException that gets - // parsed to a DetectorValidationIssue that is returned to + // parsed to a ValidationIssue that is returned to // the user as a response indicating index doesn't exist ConfigValidationIssue issue = parseValidationException( - new ValidationException(ADCommonMessages.INDEX_NOT_FOUND, ValidationIssueType.INDICES, ValidationAspect.DETECTOR) + new ValidationException(CommonMessages.INDEX_NOT_FOUND, ValidationIssueType.INDICES, validationAspect) ); listener.onResponse(new ValidateConfigResponse(issue)); return; @@ -144,7 +147,10 @@ protected Map getFeatureSubIssuesFromErrorMessage(String errorMe Map result = new HashMap<>(); String[] subIssueMessagesSuffix = errorMessage.split(", "); for (int i = 0; i < subIssueMessagesSuffix.length; i++) { - result.put(subIssueMessagesSuffix[i].split(": ")[1], subIssueMessagesSuffix[i].split(": ")[0]); + String[] subIssueMsgs = subIssueMessagesSuffix[i].split(": "); + // e.g., key: value: Feature max1 has issue: Data is most likely too sparse when given feature queries are applied. Consider + // revising feature queries. + result.put(subIssueMsgs[1], subIssueMsgs[0]); } return result; } @@ -224,5 +230,5 @@ public void validateExecute( }, listener); } - protected abstract Processor createProcessor(Config detector, ValidateConfigRequest request, User user); + protected abstract Processor createProcessor(Config config, ValidateConfigRequest request, User user); } diff --git a/src/main/java/org/opensearch/timeseries/util/ParseUtils.java b/src/main/java/org/opensearch/timeseries/util/ParseUtils.java index 5be698e9b..119e21dab 100644 --- a/src/main/java/org/opensearch/timeseries/util/ParseUtils.java +++ b/src/main/java/org/opensearch/timeseries/util/ParseUtils.java @@ -390,6 +390,42 @@ public static SearchSourceBuilder generateColdStartQuery( return new SearchSourceBuilder().query(internalFilterQuery).size(0).aggregation(dateRangeBuilder); } + public static SearchSourceBuilder generateColdStartQueryForSingleFeature( + Config config, + List> ranges, + Optional entity, + NamedXContentRegistry xContentRegistry, + int featureIndex + ) throws IOException { + + BoolQueryBuilder internalFilterQuery = QueryBuilders.boolQuery().filter(config.getFilterQuery()); + + if (entity.isPresent()) { + for (TermQueryBuilder term : entity.get().getTermQueryForCustomerIndex()) { + internalFilterQuery.filter(term); + } + } + + DateRangeAggregationBuilder dateRangeBuilder = dateRange("date_range").field(config.getTimeField()).format("epoch_millis"); + for (Entry range : ranges) { + dateRangeBuilder.addRange(range.getKey(), range.getValue()); + } + + if (config.getFeatureAttributes() != null) { + Feature feature = config.getFeatureAttributes().get(featureIndex); + AggregatorFactories.Builder internalAgg = parseAggregators( + feature.getAggregation().toString(), + xContentRegistry, + feature.getId() + ); + dateRangeBuilder.subAggregation(internalAgg.getAggregatorFactories().iterator().next()); + } else { + throw new IllegalArgumentException("empty feature"); + } + + return new SearchSourceBuilder().query(internalFilterQuery).size(0).aggregation(dateRangeBuilder); + } + /** * Map feature data to its Id and name * @param currentFeature Feature data @@ -802,6 +838,13 @@ public static List getFeatureFieldNames(Config config, NamedXContentRegi return featureFields; } + /** + * This works only when the query is a simple aggregation. It won't work for aggregation with a filter. + * @param feature Feature in AD + * @param xContentRegistry used to parse xcontent + * @return parsed field name + * @throws IOException when parsing fails + */ public static List getFieldNamesForFeature(Feature feature, NamedXContentRegistry xContentRegistry) throws IOException { ParseUtils.parseAggregators(feature.getAggregation().toString(), xContentRegistry, feature.getId()); XContentParser parser = XContentType.JSON diff --git a/src/main/java/org/opensearch/timeseries/util/RestHandlerUtils.java b/src/main/java/org/opensearch/timeseries/util/RestHandlerUtils.java index e8c02b838..ec45ff02c 100644 --- a/src/main/java/org/opensearch/timeseries/util/RestHandlerUtils.java +++ b/src/main/java/org/opensearch/timeseries/util/RestHandlerUtils.java @@ -158,7 +158,7 @@ public static String checkFeaturesSyntax(Config config, int maxFeatures) { List features = config.getFeatureAttributes(); if (features != null) { if (features.size() > maxFeatures) { - return "Can't create more than " + maxFeatures + " features"; + return "Can't create more than " + maxFeatures + " feature(s)"; } return validateFeaturesConfig(config.getFeatureAttributes()); } diff --git a/src/test/java/org/opensearch/ad/ml/AbstractCosineDataTest.java b/src/test/java/org/opensearch/ad/ml/AbstractCosineDataTest.java index 0002fcd69..0f7716bce 100644 --- a/src/test/java/org/opensearch/ad/ml/AbstractCosineDataTest.java +++ b/src/test/java/org/opensearch/ad/ml/AbstractCosineDataTest.java @@ -14,11 +14,14 @@ import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.spy; import static org.mockito.Mockito.when; import java.time.Clock; import java.time.Instant; import java.time.temporal.ChronoUnit; +import java.util.Arrays; +import java.util.Collections; import java.util.HashSet; import java.util.List; import java.util.Set; @@ -37,6 +40,7 @@ import org.opensearch.common.settings.ClusterSettings; import org.opensearch.common.settings.Setting; import org.opensearch.common.settings.Settings; +import org.opensearch.common.unit.TimeValue; import org.opensearch.core.action.ActionListener; import org.opensearch.threadpool.ThreadPool; import org.opensearch.timeseries.AbstractTimeSeriesTest; @@ -54,6 +58,7 @@ import org.opensearch.timeseries.model.IntervalTimeConfiguration; import org.opensearch.timeseries.settings.TimeSeriesSettings; import org.opensearch.timeseries.util.ClientUtil; +import org.opensearch.timeseries.util.SecurityClientUtil; import com.amazon.randomcutforest.parkservices.ThresholdedRandomCutForest; import com.google.common.collect.ImmutableList; @@ -89,6 +94,8 @@ public class AbstractCosineDataTest extends AbstractTimeSeriesTest { Set> nodestateSetting; int detectorInterval = 1; int shingleSize; + Client client; + SecurityClientUtil securityCientUtil; @SuppressWarnings("unchecked") @Override @@ -104,9 +111,9 @@ public void setUp() throws Exception { threadPool = mock(ThreadPool.class); setUpADThreadPool(threadPool); - settings = Settings.EMPTY; + settings = Settings.builder().put(AnomalyDetectorSettings.AD_CHECKPOINT_SAVING_FREQ.getKey(), TimeValue.timeValueHours(12)).build(); - Client client = mock(Client.class); + client = mock(Client.class); clientUtil = mock(ClientUtil.class); detector = TestHelpers.AnomalyDetectorBuilder @@ -137,7 +144,37 @@ public void setUp() throws Exception { imputer = new LinearUniformImputer(true); - searchFeatureDao = mock(SearchFeatureDao.class); + ClusterSettings clusterSettings = new ClusterSettings( + settings, + Collections + .unmodifiableSet( + new HashSet<>( + Arrays + .asList( + AnomalyDetectorSettings.MAX_ENTITIES_FOR_PREVIEW, + AnomalyDetectorSettings.AD_PAGE_SIZE, + AnomalyDetectorSettings.AD_CHECKPOINT_SAVING_FREQ + ) + ) + ) + ); + clusterService = mock(ClusterService.class); + when(clusterService.getClusterSettings()).thenReturn(clusterSettings); + + securityCientUtil = new SecurityClientUtil(stateManager, settings); + searchFeatureDao = spy( + new SearchFeatureDao( + client, + xContentRegistry(), // Important. Without this, ParseUtils cannot parse anything + securityCientUtil, + clusterService, + TimeSeriesSettings.NUM_SAMPLES_PER_TREE, + clock, + 1, + 1, + 60_000L + ) + ); featureManager = new FeatureManager( searchFeatureDao, diff --git a/src/test/java/org/opensearch/ad/ml/EntityColdStarterTests.java b/src/test/java/org/opensearch/ad/ml/EntityColdStarterTests.java index 8c1932dc0..3feccd298 100644 --- a/src/test/java/org/opensearch/ad/ml/EntityColdStarterTests.java +++ b/src/test/java/org/opensearch/ad/ml/EntityColdStarterTests.java @@ -948,7 +948,6 @@ private void accuracyTemplate(int detectorIntervalMins, float precisionThreshold // training data ranges from timestamps[0] ~ timestamps[trainTestSplit-1] doAnswer(invocation -> { - GetRequest request = invocation.getArgument(0); ActionListener listener = invocation.getArgument(2); listener.onResponse(TestHelpers.createGetResponse(detector, detector.getId(), CommonName.CONFIG_INDEX)); diff --git a/src/test/java/org/opensearch/ad/ml/HCADModelPerfTests.java b/src/test/java/org/opensearch/ad/ml/HCADModelPerfTests.java index 5ebc7ba43..1329b01d5 100644 --- a/src/test/java/org/opensearch/ad/ml/HCADModelPerfTests.java +++ b/src/test/java/org/opensearch/ad/ml/HCADModelPerfTests.java @@ -17,6 +17,7 @@ import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.reset; +import static org.mockito.Mockito.spy; import static org.mockito.Mockito.when; import java.time.Clock; @@ -114,7 +115,19 @@ private void averageAccuracyTemplate( long seed = z; LOG.info("seed = " + seed); // recreate in each loop; otherwise, we will have heap overflow issue. - searchFeatureDao = mock(SearchFeatureDao.class); + searchFeatureDao = spy( + new SearchFeatureDao( + client, + xContentRegistry(), // Important. Without this, ParseUtils cannot parse anything + securityCientUtil, + clusterService, + TimeSeriesSettings.NUM_SAMPLES_PER_TREE, + clock, + 1, + 1, + 60_000L + ) + ); featureManager = new FeatureManager( searchFeatureDao, diff --git a/src/test/java/org/opensearch/ad/rest/DataDependentADRestApiIT.java b/src/test/java/org/opensearch/ad/rest/DataDependentADRestApiIT.java new file mode 100644 index 000000000..2be8a59b6 --- /dev/null +++ b/src/test/java/org/opensearch/ad/rest/DataDependentADRestApiIT.java @@ -0,0 +1,167 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.ad.rest; + +import static org.opensearch.timeseries.util.RestHandlerUtils.VALIDATE; + +import java.time.Duration; +import java.time.Instant; +import java.time.temporal.ChronoUnit; +import java.util.Locale; +import java.util.Map; + +import org.opensearch.ad.AbstractADSyntheticDataTest; +import org.opensearch.client.Response; +import org.opensearch.core.rest.RestStatus; +import org.opensearch.timeseries.TestHelpers; +import org.opensearch.timeseries.TimeSeriesAnalyticsPlugin; + +import com.google.common.collect.ImmutableMap; + +/** + * Different from AnomalyDetectorRestApiIT, this IT focuses on tests requiring delicates setup of data like validate api. + * + */ +public class DataDependentADRestApiIT extends AbstractADSyntheticDataTest { + private static final String VALIDATE_DETECTOR_MODEL; + + static { + VALIDATE_DETECTOR_MODEL = String + .format(Locale.ROOT, "%s/%s/%s", TimeSeriesAnalyticsPlugin.AD_BASE_DETECTORS_URI, VALIDATE, "model"); + } + + public void testTwoFeatureSparse() throws Exception { + + Instant trainTime = loadRuleData(200); + + // case 1: both filters in features cause sparsity + String detectorDef = "{\n" + + " \"name\": \"Second-Test-Detector-4\",\n" + + " \"description\": \"ok rate\",\n" + + " \"time_field\": \"timestamp\",\n" + + " \"indices\": [\n" + + " \"%s\"\n" + + " ],\n" + + " \"feature_attributes\": [\n" + + " {\n" + + " \"feature_id\": \"max1\",\n" + + " \"feature_name\": \"max1\",\n" + + " \"feature_enabled\": true,\n" + + " \"importance\": 1,\n" + + " \"aggregation_query\": {\n" + + " \"filtered_max_1\": {\n" + + " \"filter\": {\n" + + " \"bool\": {\n" + + " \"must\": [\n" + + " {\n" + + " \"range\": {\n" + + " \"timestamp\": {\n" + + " \"lt\": %d\n" + + " }\n" + + " }\n" + + " }\n" + + " ]\n" + + " }\n" + + " },\n" + + " \"aggregations\": {\n" + + " \"max1\": {\n" + + " \"max\": {\n" + + " \"field\": \"transform._doc_count\"\n" + + " }\n" + + " }\n" + + " }\n" + + " }\n" + + " }\n" + + " },\n" + + " {\n" + + " \"feature_id\": \"max2\",\n" + + " \"feature_name\": \"max2\",\n" + + " \"feature_enabled\": true,\n" + + " \"importance\": 1,\n" + + " \"aggregation_query\": {\n" + + " \"filtered_max_2\": {\n" + + " \"filter\": {\n" + + " \"bool\": {\n" + + " \"must\": [\n" + + " {\n" + + " \"range\": {\n" + + " \"timestamp\": {\n" + + " \"lt\": %d\n" + + " }\n" + + " }\n" + + " }\n" + + " ]\n" + + " }\n" + + " },\n" + + " \"aggregations\": {\n" + + " \"max2\": {\n" + + " \"max\": {\n" + + " \"field\": \"transform._doc_count\"\n" + + " }\n" + + " }\n" + + " }\n" + + " }\n" + + " }\n" + + " }\n" + + " ],\n" + + " \"window_delay\": {\n" + + " \"period\": {\n" + + " \"interval\": %d,\n" + + " \"unit\": \"MINUTES\"\n" + + " }\n" + + " },\n" + + " \"ui_metadata\": {\n" + + " \"aabb\": {\n" + + " \"ab\": \"bb\"\n" + + " }\n" + + " },\n" + + " \"schema_version\": 2,\n" + + " \"detection_interval\": {\n" + + " \"period\": {\n" + + " \"interval\": 10,\n" + + " \"unit\": \"MINUTES\"\n" + + " }\n" + + " }\n" + + "}"; + + // +1 to make sure it is big enough + long windowDelayMinutes = Duration.between(trainTime, Instant.now()).toMinutes() + 1; + // we have 100 timestamps (2 entities per timestamp). Timestamps are 10 minutes apart. If we subtract 70 * 10 = 700 minutes, we have + // sparse data. + long featureFilter = trainTime.minus(700, ChronoUnit.MINUTES).toEpochMilli(); + String formattedDetector = String + .format(Locale.ROOT, detectorDef, RULE_DATASET_NAME, featureFilter, featureFilter, windowDelayMinutes); + Response response = TestHelpers + .makeRequest( + client(), + "POST", + String.format(Locale.ROOT, VALIDATE_DETECTOR_MODEL), + ImmutableMap.of(), + TestHelpers.toHttpEntity(formattedDetector), + null + ); + assertEquals("Validate forecaster interval failed", RestStatus.OK, TestHelpers.restStatus(response)); + Map responseMap = entityAsMap(response); + Map validation = (Map) ((Map) responseMap.get("model")).get("feature_attributes"); + String msg = (String) validation.get("message"); + // due to concurrency, feature order can change. + assertTrue( + "Data is most likely too sparse when given feature queries are applied. Consider revising feature queries: max1, Data is most likely too sparse when given feature queries are applied. Consider revising feature queries: max2" + .equals(msg) + || "Data is most likely too sparse when given feature queries are applied. Consider revising feature queries: max2, Data is most likely too sparse when given feature queries are applied. Consider revising feature queries: max1" + .equals(msg) + ); + Map subIssues = (Map) validation.get("sub_issues"); + assertEquals( + "Data is most likely too sparse when given feature queries are applied. Consider revising feature queries", + subIssues.get("max1") + ); + assertEquals( + "Data is most likely too sparse when given feature queries are applied. Consider revising feature queries", + subIssues.get("max2") + ); + } +} diff --git a/src/test/java/org/opensearch/ad/transport/ValidateAnomalyDetectorTransportActionTests.java b/src/test/java/org/opensearch/ad/transport/ValidateAnomalyDetectorTransportActionTests.java index d31190b9c..7ba4680e7 100644 --- a/src/test/java/org/opensearch/ad/transport/ValidateAnomalyDetectorTransportActionTests.java +++ b/src/test/java/org/opensearch/ad/transport/ValidateAnomalyDetectorTransportActionTests.java @@ -19,7 +19,6 @@ import org.junit.Test; import org.opensearch.ad.ADIntegTestCase; -import org.opensearch.ad.constant.ADCommonMessages; import org.opensearch.ad.constant.ADCommonName; import org.opensearch.ad.indices.ADIndexManagement; import org.opensearch.ad.model.AnomalyDetector; @@ -79,7 +78,7 @@ public void testValidateAnomalyDetectorWithNoIndexFound() throws IOException { assertNotNull(response.getIssue()); assertEquals(ValidationIssueType.INDICES, response.getIssue().getType()); assertEquals(ValidationAspect.DETECTOR, response.getIssue().getAspect()); - assertTrue(response.getIssue().getMessage().contains(ADCommonMessages.INDEX_NOT_FOUND)); + assertTrue(response.getIssue().getMessage().contains(CommonMessages.INDEX_NOT_FOUND)); } @Test diff --git a/src/test/java/org/opensearch/ad/util/RestHandlerUtilsTests.java b/src/test/java/org/opensearch/ad/util/RestHandlerUtilsTests.java index e557545fb..f7041f550 100644 --- a/src/test/java/org/opensearch/ad/util/RestHandlerUtilsTests.java +++ b/src/test/java/org/opensearch/ad/util/RestHandlerUtilsTests.java @@ -88,7 +88,7 @@ public void testisExceptionCausedByInvalidQueryNotSearchPhaseException() { public void testValidateAnomalyDetectorWithTooManyFeatures() throws IOException { AnomalyDetector detector = TestHelpers.randomAnomalyDetector(ImmutableList.of(randomFeature(), randomFeature())); String error = RestHandlerUtils.checkFeaturesSyntax(detector, 1); - assertEquals("Can't create more than 1 features", error); + assertEquals("Can't create more than 1 feature(s)", error); } public void testValidateAnomalyDetectorWithDuplicateFeatureNames() throws IOException { diff --git a/src/test/java/org/opensearch/forecast/AbstractForecastSyntheticDataTest.java b/src/test/java/org/opensearch/forecast/AbstractForecastSyntheticDataTest.java index a2575a457..2546d3c2b 100644 --- a/src/test/java/org/opensearch/forecast/AbstractForecastSyntheticDataTest.java +++ b/src/test/java/org/opensearch/forecast/AbstractForecastSyntheticDataTest.java @@ -5,53 +5,6 @@ package org.opensearch.forecast; -import java.io.File; -import java.io.FileReader; -import java.io.IOException; -import java.net.URISyntaxException; -import java.nio.charset.Charset; -import java.util.ArrayList; -import java.util.List; - import org.opensearch.timeseries.AbstractSyntheticDataTest; -import com.google.gson.Gson; -import com.google.gson.JsonArray; -import com.google.gson.JsonObject; -import com.google.gson.stream.JsonReader; - -public class AbstractForecastSyntheticDataTest extends AbstractSyntheticDataTest { - /** - * Read data from a json array file up to a specified size - * @param datasetFileName data set file name - * @param size the limit of json elements to read - * @return the read JsonObject list - * @throws URISyntaxException when failing to find datasetFileName - * @throws Exception when there is a parsing error. - */ - public static List readJsonArrayWithLimit(String datasetFileName, int limit) throws URISyntaxException { - List jsonObjects = new ArrayList<>(); - - try ( - FileReader fileReader = new FileReader( - new File(AbstractForecastSyntheticDataTest.class.getClassLoader().getResource(datasetFileName).toURI()), - Charset.defaultCharset() - ); - JsonReader jsonReader = new JsonReader(fileReader) - ) { - - Gson gson = new Gson(); - JsonArray jsonArray = gson.fromJson(jsonReader, JsonArray.class); - - for (int i = 0; i < limit && i < jsonArray.size(); i++) { - JsonObject jsonObject = jsonArray.get(i).getAsJsonObject(); - jsonObjects.add(jsonObject); - } - - } catch (IOException e) { - LOG.error("fail to read json array", e); - } - - return jsonObjects; - } -} +public class AbstractForecastSyntheticDataTest extends AbstractSyntheticDataTest {} diff --git a/src/test/java/org/opensearch/forecast/rest/ForecastRestApiIT.java b/src/test/java/org/opensearch/forecast/rest/ForecastRestApiIT.java index 8776ec971..43e011dba 100644 --- a/src/test/java/org/opensearch/forecast/rest/ForecastRestApiIT.java +++ b/src/test/java/org/opensearch/forecast/rest/ForecastRestApiIT.java @@ -5,12 +5,20 @@ package org.opensearch.forecast.rest; +import static org.hamcrest.Matchers.containsString; import static org.opensearch.timeseries.util.RestHandlerUtils.SUGGEST; +import static org.opensearch.timeseries.util.RestHandlerUtils.VALIDATE; +import java.time.Duration; +import java.time.Instant; +import java.time.temporal.ChronoUnit; import java.util.List; import java.util.Locale; import java.util.Map; +import java.util.regex.Matcher; +import java.util.regex.Pattern; +import org.hamcrest.MatcherAssert; import org.junit.Before; import org.opensearch.client.Response; import org.opensearch.client.ResponseException; @@ -30,13 +38,16 @@ /** * Test the following Restful API: * - Suggest + * - Validate + * - Create * */ public class ForecastRestApiIT extends AbstractForecastSyntheticDataTest { - private static final String SYNTHETIC_DATASET_NAME = "synthetic"; - private static final String RULE_DATASET_NAME = "rule"; private static final String SUGGEST_INTERVAL_URI; private static final String SUGGEST_INTERVAL_HORIZON_HISTORY_URI; + private static final String VALIDATE_FORECASTER; + private static final String VALIDATE_FORECASTER_MODEL; + private static final String CREATE_FORECASTER; static { SUGGEST_INTERVAL_URI = String @@ -57,6 +68,10 @@ public class ForecastRestApiIT extends AbstractForecastSyntheticDataTest { Forecaster.HORIZON_FIELD, Config.HISTORY_INTERVAL_FIELD ); + VALIDATE_FORECASTER = String.format(Locale.ROOT, "%s/%s", TimeSeriesAnalyticsPlugin.FORECAST_FORECASTERS_URI, VALIDATE); + VALIDATE_FORECASTER_MODEL = String + .format(Locale.ROOT, "%s/%s/%s", TimeSeriesAnalyticsPlugin.FORECAST_FORECASTERS_URI, VALIDATE, "model"); + CREATE_FORECASTER = TimeSeriesAnalyticsPlugin.FORECAST_FORECASTERS_URI; } @Override @@ -66,16 +81,63 @@ public void setUp() throws Exception { updateClusterSettings(ForecastEnabledSetting.FORECAST_ENABLED, true); } - private static void loadData(String datasetName, int trainTestSplit) throws Exception { + /** + * manipulate the data to have three categorical fields. Originally, the data file has one categorical field with two possible values. + * Simulate we have enough raw data but the categorical value caused sparsity. + * @param trainTestSplit the number of rows to load + * @return train time + * @throws Exception when failing to ingest data + */ + private static Instant loadSparseCategoryData(int trainTestSplit) throws Exception { RestClient client = client(); - String dataFileName = String.format(Locale.ROOT, "org/opensearch/ad/e2e/data/%s.data", datasetName); - + String dataFileName = String.format(Locale.ROOT, "org/opensearch/ad/e2e/data/%s.data", RULE_DATASET_NAME); List data = readJsonArrayWithLimit(dataFileName, trainTestSplit); - String mapping = "{ \"mappings\": { \"properties\": { \"timestamp\": { \"type\": \"date\"}," - + " \"Feature1\": { \"type\": \"double\" }, \"Feature2\": { \"type\": \"double\" } } } }"; - bulkIndexTrainData(datasetName, data, trainTestSplit, client, mapping); + // Deep copy the list using Gson + + String mapping = "{ \"mappings\": { \"properties\": { \"timestamp\": { \"type\":" + + "\"date\"" + + "}," + + " \"transform._doc_count\": { \"type\": \"integer\" }," + + "\"component1Name\": { \"type\": \"keyword\"}," + + "\"component2Name\": { \"type\": \"keyword\"}" + + "} } }"; + int phonenixIndex = 0; + int scottsdaleIndex = 0; + for (int i = 0; i < trainTestSplit; i++) { + JsonObject row = data.get(i); + + // Get the value of the "componentName" field + String componentName = row.get("componentName").getAsString(); + + // Replace the field based on the value of "componentName" + row.remove("componentName"); // Remove the original "componentName" field + + if ("Phoenix".equals(componentName)) { + if (phonenixIndex % 2 == 0) { + row.addProperty("component1Name", "server1"); + row.addProperty("component2Name", "app1"); + } else { + row.addProperty("component1Name", "server2"); + row.addProperty("component2Name", "app1"); + } + phonenixIndex++; + } else if ("Scottsdale".equals(componentName)) { + if (scottsdaleIndex % 2 == 0) { + row.addProperty("component1Name", "server3"); + row.addProperty("component2Name", "app2"); + } else { + row.addProperty("component1Name", "server4"); + row.addProperty("component2Name", "app2"); + } + scottsdaleIndex++; + } + } + + bulkIndexTrainData(RULE_DATASET_NAME, data, trainTestSplit, client, mapping); + String trainTimeStr = data.get(trainTestSplit - 1).get("timestamp").getAsString(); + return Instant.ofEpochMilli(Long.parseLong(trainTimeStr)); } /** @@ -86,7 +148,7 @@ private static void loadData(String datasetName, int trainTestSplit) throws Exce * @throws Exception when loading data */ public void testSuggestOneMinute() throws Exception { - loadData(SYNTHETIC_DATASET_NAME, 200); + loadSyntheticData(200); // case 1: suggest 1 minute interval for a time series with 1 minute cadence String forecasterDef = "{\n" + " \"name\": \"Second-Test-Forecaster-4\",\n" @@ -266,7 +328,7 @@ public void testSuggestOneMinute() throws Exception { } public void testSuggestTenMinute() throws Exception { - loadData(RULE_DATASET_NAME, 200); + loadRuleData(200); // case 1: The following request validates against the source data to see if model training might succeed. In this example, the data // is ingested at a rate of every ten minutes. final String forecasterDef = "{\n" @@ -397,7 +459,7 @@ public void testSuggestTenMinute() throws Exception { } public void testSuggestSparseData() throws Exception { - loadData(SYNTHETIC_DATASET_NAME, 10); + loadSyntheticData(10); // case 1: suggest 1 minute interval for a time series with 1 minute cadence String forecasterDef = "{\n" + " \"name\": \"Second-Test-Forecaster-4\",\n" @@ -452,4 +514,1353 @@ public void testSuggestSparseData() throws Exception { // no suggestion assertEquals(0, responseMap.size()); } + + public void testValidate() throws Exception { + loadSyntheticData(200); + // case 1: forecaster interval is not set + String forecasterDef = "{\n" + + " \"name\": \"Second-Test-Forecaster-4\",\n" + + " \"description\": \"ok rate\",\n" + + " \"time_field\": \"timestamp\",\n" + + " \"indices\": [\n" + + " \"%s\"\n" + + " ],\n" + + " \"feature_attributes\": [\n" + + " {\n" + + " \"feature_id\": \"sum1\",\n" + + " \"feature_name\": \"sum1\",\n" + + " \"feature_enabled\": true,\n" + + " \"importance\": 1,\n" + + " \"aggregation_query\": {\n" + + " \"sum1\": {\n" + + " \"sum\": {\n" + + " \"field\": \"Feature1\"\n" + + " }\n" + + " }\n" + + " }\n" + + " }\n" + + " ],\n" + + " \"window_delay\": {\n" + + " \"period\": {\n" + + " \"interval\": 20,\n" + + " \"unit\": \"SECONDS\"\n" + + " }\n" + + " },\n" + + " \"ui_metadata\": {\n" + + " \"aabb\": {\n" + + " \"ab\": \"bb\"\n" + + " }\n" + + " },\n" + + " \"schema_version\": 2,\n" + + " \"horizon\": 24\n" + + "}"; + + String formattedForecaster = String.format(Locale.ROOT, forecasterDef, SYNTHETIC_DATASET_NAME); + + Response response = TestHelpers + .makeRequest( + client(), + "POST", + String.format(Locale.ROOT, VALIDATE_FORECASTER), + ImmutableMap.of(), + TestHelpers.toHttpEntity(formattedForecaster), + null + ); + assertEquals("Validate forecaster failed", RestStatus.OK, TestHelpers.restStatus(response)); + Map responseMap = entityAsMap(response); + Map validations = (Map) ((Map) responseMap.get("forecaster")) + .get("forecast_interval"); + assertEquals("Forecast interval should be set", validations.get("message")); + + // case 2: if there is no problem, nothing shows + forecasterDef = "{\n" + + " \"name\": \"Second-Test-Forecaster-4\",\n" + + " \"description\": \"ok rate\",\n" + + " \"time_field\": \"timestamp\",\n" + + " \"indices\": [\n" + + " \"%s\"\n" + + " ],\n" + + " \"feature_attributes\": [\n" + + " {\n" + + " \"feature_id\": \"sum1\",\n" + + " \"feature_name\": \"sum1\",\n" + + " \"feature_enabled\": true,\n" + + " \"importance\": 1,\n" + + " \"aggregation_query\": {\n" + + " \"sum1\": {\n" + + " \"sum\": {\n" + + " \"field\": \"Feature1\"\n" + + " }\n" + + " }\n" + + " }\n" + + " }\n" + + " ],\n" + + " \"window_delay\": {\n" + + " \"period\": {\n" + + " \"interval\": 20,\n" + + " \"unit\": \"SECONDS\"\n" + + " }\n" + + " },\n" + + " \"ui_metadata\": {\n" + + " \"aabb\": {\n" + + " \"ab\": \"bb\"\n" + + " }\n" + + " },\n" + + " \"schema_version\": 2,\n" + + " \"horizon\": 24,\n" + + " \"forecast_interval\": {\n" + + " \"period\": {\n" + + " \"interval\": 1,\n" + + " \"unit\": \"MINUTES\"\n" + + " }\n" + + " }\n" + + "}"; + + formattedForecaster = String.format(Locale.ROOT, forecasterDef, SYNTHETIC_DATASET_NAME); + + response = TestHelpers + .makeRequest( + client(), + "POST", + String.format(Locale.ROOT, VALIDATE_FORECASTER), + ImmutableMap.of(), + TestHelpers.toHttpEntity(formattedForecaster), + null + ); + assertEquals("Validate forecaster failed", RestStatus.OK, TestHelpers.restStatus(response)); + responseMap = entityAsMap(response); + // no issues + assertEquals(0, responseMap.size()); + + // case 3: the feature query aggregates over a field that doesn’t exist in the data source: + // Note: this only works for aggregation without a default value. For sum/count, we won't + // detect as OpenSearch still returns default value. + forecasterDef = "{\n" + + " \"name\": \"Second-Test-Forecaster-4\",\n" + + " \"description\": \"ok rate\",\n" + + " \"time_field\": \"timestamp\",\n" + + " \"indices\": [\n" + + " \"%s\"\n" + + " ],\n" + + " \"feature_attributes\": [\n" + + " {\n" + + " \"feature_id\": \"max1\",\n" + + " \"feature_name\": \"max1\",\n" + + " \"feature_enabled\": true,\n" + + " \"importance\": 1,\n" + + " \"aggregation_query\": {\n" + + " \"max1\": {\n" + + " \"max\": {\n" + + " \"field\": \"Feature10\"\n" + + " }\n" + + " }\n" + + " }\n" + + " }\n" + + " ],\n" + + " \"window_delay\": {\n" + + " \"period\": {\n" + + " \"interval\": 20,\n" + + " \"unit\": \"SECONDS\"\n" + + " }\n" + + " },\n" + + " \"ui_metadata\": {\n" + + " \"aabb\": {\n" + + " \"ab\": \"bb\"\n" + + " }\n" + + " },\n" + + " \"schema_version\": 2,\n" + + " \"horizon\": 24,\n" + + " \"forecast_interval\": {\n" + + " \"period\": {\n" + + " \"interval\": 1,\n" + + " \"unit\": \"MINUTES\"\n" + + " }\n" + + " }\n" + + "}"; + + formattedForecaster = String.format(Locale.ROOT, forecasterDef, SYNTHETIC_DATASET_NAME); + + response = TestHelpers + .makeRequest( + client(), + "POST", + String.format(Locale.ROOT, VALIDATE_FORECASTER), + ImmutableMap.of(), + TestHelpers.toHttpEntity(formattedForecaster), + null + ); + assertEquals("Validate forecaster failed", RestStatus.OK, TestHelpers.restStatus(response)); + responseMap = entityAsMap(response); + validations = (Map) ((Map) responseMap.get("forecaster")).get("feature_attributes"); + assertEquals("Feature has an invalid query returning empty aggregated data: max1", validations.get("message")); + assertEquals( + "Feature has an invalid query returning empty aggregated data", + ((Map) validations.get("sub_issues")).get("max1") + ); + } + + public void testValidateSparseData() throws Exception { + Instant trainTime = loadSyntheticData(10); + long windowDelayMinutes = Duration.between(trainTime, Instant.now()).toMinutes(); + + // case 1 : sparse data + String forecasterDef = "{\n" + + " \"name\": \"Second-Test-Forecaster-4\",\n" + + " \"description\": \"ok rate\",\n" + + " \"time_field\": \"timestamp\",\n" + + " \"indices\": [\n" + + " \"%s\"\n" + + " ],\n" + + " \"feature_attributes\": [\n" + + " {\n" + + " \"feature_id\": \"sum1\",\n" + + " \"feature_name\": \"sum1\",\n" + + " \"feature_enabled\": true,\n" + + " \"importance\": 1,\n" + + " \"aggregation_query\": {\n" + + " \"sum1\": {\n" + + " \"sum\": {\n" + + " \"field\": \"Feature1\"\n" + + " }\n" + + " }\n" + + " }\n" + + " }\n" + + " ],\n" + + " \"window_delay\": {\n" + + " \"period\": {\n" + + " \"interval\": %d,\n" + + " \"unit\": \"MINUTES\"\n" + + " }\n" + + " },\n" + + " \"ui_metadata\": {\n" + + " \"aabb\": {\n" + + " \"ab\": \"bb\"\n" + + " }\n" + + " },\n" + + " \"schema_version\": 2,\n" + + " \"horizon\": 24,\n" + + " \"forecast_interval\": {\n" + + " \"period\": {\n" + + " \"interval\": 1,\n" + + " \"unit\": \"MINUTES\"\n" + + " }\n" + + " }\n" + + "}"; + + String formattedForecaster = String.format(Locale.ROOT, forecasterDef, SYNTHETIC_DATASET_NAME, windowDelayMinutes); + + Response response = TestHelpers + .makeRequest( + client(), + "POST", + String.format(Locale.ROOT, VALIDATE_FORECASTER_MODEL), + ImmutableMap.of(), + TestHelpers.toHttpEntity(formattedForecaster), + null + ); + assertEquals("Validate forecaster model failed", RestStatus.OK, TestHelpers.restStatus(response)); + Map responseMap = entityAsMap(response); + Map validations = (Map) ((Map) responseMap.get("model")).get("indices"); + assertEquals( + "Source index data is potentially too sparse for model training. Consider changing interval length or ingesting more data", + validations.get("message") + ); + } + + public void testValidateTenMinute() throws Exception { + Instant trainTime = loadRuleData(200); + + // case 1: The following request validates against the source data to see if model training might succeed. + // In this example, the data is ingested at a rate of every 10 minutes, and forecaster interval is set to 1 minute. + String forecasterDef = "{\n" + + " \"name\": \"Second-Test-Forecaster-4\",\n" + + " \"description\": \"ok rate\",\n" + + " \"time_field\": \"timestamp\",\n" + + " \"indices\": [\n" + + " \"%s\"\n" + + " ],\n" + + " \"feature_attributes\": [\n" + + " {\n" + + " \"feature_id\": \"sum1\",\n" + + " \"feature_name\": \"sum1\",\n" + + " \"feature_enabled\": true,\n" + + " \"importance\": 1,\n" + + " \"aggregation_query\": {\n" + + " \"sum1\": {\n" + + " \"sum\": {\n" + + " \"field\": \"transform._doc_count\"\n" + + " }\n" + + " }\n" + + " }\n" + + " }\n" + + " ],\n" + + " \"window_delay\": {\n" + + " \"period\": {\n" + + " \"interval\": 20,\n" + + " \"unit\": \"SECONDS\"\n" + + " }\n" + + " },\n" + + " \"ui_metadata\": {\n" + + " \"aabb\": {\n" + + " \"ab\": \"bb\"\n" + + " }\n" + + " },\n" + + " \"schema_version\": 2,\n" + + " \"horizon\": 24,\n" + + " \"forecast_interval\": {\n" + + " \"period\": {\n" + + " \"interval\": 1,\n" + + " \"unit\": \"MINUTES\"\n" + + " }\n" + + " }\n" + + "}"; + + String formattedForecaster = String.format(Locale.ROOT, forecasterDef, RULE_DATASET_NAME); + + Response response = TestHelpers + .makeRequest( + client(), + "POST", + String.format(Locale.ROOT, VALIDATE_FORECASTER_MODEL), + ImmutableMap.of(), + TestHelpers.toHttpEntity(formattedForecaster), + null + ); + assertEquals("Validate forecaster interval failed", RestStatus.OK, TestHelpers.restStatus(response)); + Map responseMap = entityAsMap(response); + Map validation = (Map) ((Map) responseMap.get("model")).get("forecast_interval"); + assertEquals( + "The selected interval might collect sparse data. Consider changing interval length to: 10", + validation.get("message") + ); + Map suggested = (Map) ((Map) validation.get("suggested_value")).get("period"); + assertEquals("Minutes", suggested.get("unit")); + assertEquals(10, (int) suggested.get("interval")); + + // case 2: Another scenario might indicate that you can change filter_query (data filter) because the currently filtered data is too + // sparse for + // the model to train correctly, which can happen because the index is also ingesting data that falls outside the chosen filter. + // Using another + // filter_query can make your data more dense. + forecasterDef = "{\n" + + " \"name\": \"Second-Test-Forecaster-4\",\n" + + " \"description\": \"ok rate\",\n" + + " \"time_field\": \"timestamp\",\n" + + " \"indices\": [\n" + + " \"%s\"\n" + + " ],\n" + + " \"feature_attributes\": [\n" + + " {\n" + + " \"feature_id\": \"sum1\",\n" + + " \"feature_name\": \"sum1\",\n" + + " \"feature_enabled\": true,\n" + + " \"importance\": 1,\n" + + " \"aggregation_query\": {\n" + + " \"sum1\": {\n" + + " \"sum\": {\n" + + " \"field\": \"transform._doc_count\"\n" + + " }\n" + + " }\n" + + " }\n" + + " }\n" + + " ],\n" + + " \"window_delay\": {\n" + + " \"period\": {\n" + + " \"interval\": 20,\n" + + " \"unit\": \"SECONDS\"\n" + + " }\n" + + " },\n" + + " \"ui_metadata\": {\n" + + " \"aabb\": {\n" + + " \"ab\": \"bb\"\n" + + " }\n" + + " },\n" + + " \"schema_version\": 2,\n" + + " \"horizon\": 24,\n" + + " \"forecast_interval\": {\n" + + " \"period\": {\n" + + " \"interval\": 10,\n" + + " \"unit\": \"MINUTES\"\n" + + " }\n" + + " },\n" + + " \"filter_query\": {\n" + + " \"bool\": {\n" + + " \"filter\": [\n" + + " {\n" + + " \"range\": {\n" + + " \"@timestamp\": {\n" + + " \"lt\": 1\n" + + " }\n" + + " }\n" + + " }\n" + + " ],\n" + + " \"adjust_pure_negative\": true,\n" + + " \"boost\": 1\n" + + " }\n" + + " }\n" + + "}"; + + formattedForecaster = String.format(Locale.ROOT, forecasterDef, RULE_DATASET_NAME); + + response = TestHelpers + .makeRequest( + client(), + "POST", + String.format(Locale.ROOT, VALIDATE_FORECASTER_MODEL), + ImmutableMap.of(), + TestHelpers.toHttpEntity(formattedForecaster), + null + ); + assertEquals("Validate forecaster interval failed", RestStatus.OK, TestHelpers.restStatus(response)); + responseMap = entityAsMap(response); + validation = (Map) ((Map) responseMap.get("model")).get("filter_query"); + assertEquals("Data is too sparse after data filter is applied. Consider changing the data filter", validation.get("message")); + + // case 3: window delay too small + forecasterDef = "{\n" + + " \"name\": \"Second-Test-Forecaster-4\",\n" + + " \"description\": \"ok rate\",\n" + + " \"time_field\": \"timestamp\",\n" + + " \"indices\": [\n" + + " \"%s\"\n" + + " ],\n" + + " \"feature_attributes\": [\n" + + " {\n" + + " \"feature_id\": \"sum1\",\n" + + " \"feature_name\": \"sum1\",\n" + + " \"feature_enabled\": true,\n" + + " \"importance\": 1,\n" + + " \"aggregation_query\": {\n" + + " \"sum1\": {\n" + + " \"sum\": {\n" + + " \"field\": \"transform._doc_count\"\n" + + " }\n" + + " }\n" + + " }\n" + + " }\n" + + " ],\n" + + " \"window_delay\": {\n" + + " \"period\": {\n" + + " \"interval\": 20,\n" + + " \"unit\": \"SECONDS\"\n" + + " }\n" + + " },\n" + + " \"ui_metadata\": {\n" + + " \"aabb\": {\n" + + " \"ab\": \"bb\"\n" + + " }\n" + + " },\n" + + " \"schema_version\": 2,\n" + + " \"horizon\": 24,\n" + + " \"forecast_interval\": {\n" + + " \"period\": {\n" + + " \"interval\": 10,\n" + + " \"unit\": \"MINUTES\"\n" + + " }\n" + + " }\n" + + "}"; + + formattedForecaster = String.format(Locale.ROOT, forecasterDef, RULE_DATASET_NAME); + + long windowDelayMinutes = Duration.between(trainTime, Instant.now()).toMinutes(); + + response = TestHelpers + .makeRequest( + client(), + "POST", + String.format(Locale.ROOT, VALIDATE_FORECASTER_MODEL), + ImmutableMap.of(), + TestHelpers.toHttpEntity(formattedForecaster), + null + ); + assertEquals("Validate forecaster interval failed", RestStatus.OK, TestHelpers.restStatus(response)); + responseMap = entityAsMap(response); + validation = (Map) ((Map) responseMap.get("model")).get("window_delay"); + + String regex = "Latest seen data point is at least \\d+ minutes ago. Consider changing window delay to at least \\d+ minutes."; + + // Compile the pattern + Pattern pattern = Pattern.compile(regex); + + String validationMsg = (String) validation.get("message"); + Matcher matcher = pattern.matcher(validationMsg); + assertTrue("Message does not match the expected pattern.", matcher.matches()); + + Map suggestions = (Map) ((Map) validation.get("suggested_value")).get("period"); + assertTrue("should be at least " + windowDelayMinutes, (int) suggestions.get("interval") >= windowDelayMinutes); + assertEquals("should be Minutes", "Minutes", suggestions.get("unit")); + + // case 4: feature query format invalid + forecasterDef = "{\n" + + " \"name\": \"Second-Test-Forecaster-4\",\n" + + " \"description\": \"ok rate\",\n" + + " \"time_field\": \"timestamp\",\n" + + " \"indices\": [\n" + + " \"%s\"\n" + + " ],\n" + + " \"feature_attributes\": [\n" + + " {\n" + + " \"feature_id\": \"sum1\",\n" + + " \"feature_name\": \"sum1\",\n" + + " \"feature_enabled\": true,\n" + + " \"importance\": 1,\n" + + " \"aggregation_query\": {\n" + + " \"filter\": {\n" + + " \"bool\": {\n" + + " \"must\": [\n" + + " {\n" + + " \"range\": {\n" + + " \"@timestamp\": {\n" + + " \"lt\": 1\n" + + " }\n" + + " }\n" + + " }\n" + + " ]\n" + + " }\n" + + " },\n" + + " \"aggregations\": {\n" + + " \"sum1\": {\n" + + " \"sum\": {\n" + + " \"field\": \"transform._doc_count\"\n" + + " }\n" + + " }\n" + + " }\n" + + " }\n" + + " }\n" + + " ],\n" + + " \"window_delay\": {\n" + + " \"period\": {\n" + + " \"interval\": 20,\n" + + " \"unit\": \"SECONDS\"\n" + + " }\n" + + " },\n" + + " \"ui_metadata\": {\n" + + " \"aabb\": {\n" + + " \"ab\": \"bb\"\n" + + " }\n" + + " },\n" + + " \"schema_version\": 2,\n" + + " \"horizon\": 24,\n" + + " \"forecast_interval\": {\n" + + " \"period\": {\n" + + " \"interval\": 10,\n" + + " \"unit\": \"MINUTES\"\n" + + " }\n" + + " }\n" + + "}"; + + formattedForecaster = String.format(Locale.ROOT, forecasterDef, RULE_DATASET_NAME); + + response = TestHelpers + .makeRequest( + client(), + "POST", + String.format(Locale.ROOT, VALIDATE_FORECASTER_MODEL), + ImmutableMap.of(), + TestHelpers.toHttpEntity(formattedForecaster), + null + ); + assertEquals("Validate forecaster interval failed", RestStatus.OK, TestHelpers.restStatus(response)); + responseMap = entityAsMap(response); + validation = (Map) ((Map) responseMap.get("forecaster")).get("feature_attributes"); + assertEquals("Custom query error: Unknown aggregation type [bool]", validation.get("message")); + + // case 5: filter in feature query causes empty data + forecasterDef = "{\n" + + " \"name\": \"Second-Test-Forecaster-4\",\n" + + " \"description\": \"ok rate\",\n" + + " \"time_field\": \"timestamp\",\n" + + " \"indices\": [\n" + + " \"%s\"\n" + + " ],\n" + + " \"feature_attributes\": [\n" + + " {\n" + + " \"feature_id\": \"max1\",\n" + + " \"feature_name\": \"max1\",\n" + + " \"feature_enabled\": true,\n" + + " \"importance\": 1,\n" + + " \"aggregation_query\": {\n" + + " \"filtered_max_1\": {\n" + + " \"filter\": {\n" + + " \"bool\": {\n" + + " \"must\": [\n" + + " {\n" + + " \"range\": {\n" + + " \"timestamp\": {\n" + + " \"lt\": 1\n" + + " }\n" + + " }\n" + + " }\n" + + " ]\n" + + " }\n" + + " },\n" + + " \"aggregations\": {\n" + + " \"max1\": {\n" + + " \"max\": {\n" + + " \"field\": \"transform._doc_count\"\n" + + " }\n" + + " }\n" + + " }\n" + + " }\n" + + " }\n" + + " }\n" + + " ],\n" + + " \"window_delay\": {\n" + + " \"period\": {\n" + + " \"interval\": %d,\n" + + " \"unit\": \"MINUTES\"\n" + + " }\n" + + " },\n" + + " \"ui_metadata\": {\n" + + " \"aabb\": {\n" + + " \"ab\": \"bb\"\n" + + " }\n" + + " },\n" + + " \"schema_version\": 2,\n" + + " \"horizon\": 24,\n" + + " \"forecast_interval\": {\n" + + " \"period\": {\n" + + " \"interval\": 10,\n" + + " \"unit\": \"MINUTES\"\n" + + " }\n" + + " }\n" + + "}"; + + // +1 to make sure it is big enough + windowDelayMinutes = Duration.between(trainTime, Instant.now()).toMinutes() + 1; + formattedForecaster = String.format(Locale.ROOT, forecasterDef, RULE_DATASET_NAME, windowDelayMinutes); + response = TestHelpers + .makeRequest( + client(), + "POST", + String.format(Locale.ROOT, VALIDATE_FORECASTER_MODEL), + ImmutableMap.of(), + TestHelpers.toHttpEntity(formattedForecaster), + null + ); + assertEquals("Validate forecaster interval failed", RestStatus.OK, TestHelpers.restStatus(response)); + responseMap = entityAsMap(response); + validation = (Map) ((Map) responseMap.get("forecaster")).get("feature_attributes"); + assertEquals("Feature has an invalid query returning empty aggregated data: max1", validation.get("message")); + + // case 6: wrong field name in feature query causes empty data + // Note we cannot deal with aggregation with default value like sum. + forecasterDef = "{\n" + + " \"name\": \"Second-Test-Forecaster-4\",\n" + + " \"description\": \"ok rate\",\n" + + " \"time_field\": \"timestamp\",\n" + + " \"indices\": [\n" + + " \"%s\"\n" + + " ],\n" + + " \"feature_attributes\": [\n" + + " {\n" + + " \"feature_id\": \"max1\",\n" + + " \"feature_name\": \"max1\",\n" + + " \"feature_enabled\": true,\n" + + " \"importance\": 1,\n" + + " \"aggregation_query\": {\n" + + " \"max1\": {\n" + + " \"max\": {\n" + + " \"field\": \"transform._doc_count2\"\n" + + " }\n" + + " }\n" + + " }\n" + + " }\n" + + " ],\n" + + " \"window_delay\": {\n" + + " \"period\": {\n" + + " \"interval\": %d,\n" + + " \"unit\": \"MINUTES\"\n" + + " }\n" + + " },\n" + + " \"ui_metadata\": {\n" + + " \"aabb\": {\n" + + " \"ab\": \"bb\"\n" + + " }\n" + + " },\n" + + " \"schema_version\": 2,\n" + + " \"horizon\": 24,\n" + + " \"forecast_interval\": {\n" + + " \"period\": {\n" + + " \"interval\": 10,\n" + + " \"unit\": \"MINUTES\"\n" + + " }\n" + + " }\n" + + "}"; + + // +1 to make sure it is big enough + windowDelayMinutes = Duration.between(trainTime, Instant.now()).toMinutes() + 1; + formattedForecaster = String.format(Locale.ROOT, forecasterDef, RULE_DATASET_NAME, windowDelayMinutes); + response = TestHelpers + .makeRequest( + client(), + "POST", + String.format(Locale.ROOT, VALIDATE_FORECASTER_MODEL), + ImmutableMap.of(), + TestHelpers.toHttpEntity(formattedForecaster), + null + ); + assertEquals("Validate forecaster interval failed", RestStatus.OK, TestHelpers.restStatus(response)); + responseMap = entityAsMap(response); + validation = (Map) ((Map) responseMap.get("forecaster")).get("feature_attributes"); + assertEquals("Feature has an invalid query returning empty aggregated data: max1", validation.get("message")); + + // case 7: filter in feature query causes sparse data + forecasterDef = "{\n" + + " \"name\": \"Second-Test-Forecaster-4\",\n" + + " \"description\": \"ok rate\",\n" + + " \"time_field\": \"timestamp\",\n" + + " \"indices\": [\n" + + " \"%s\"\n" + + " ],\n" + + " \"feature_attributes\": [\n" + + " {\n" + + " \"feature_id\": \"max1\",\n" + + " \"feature_name\": \"max1\",\n" + + " \"feature_enabled\": true,\n" + + " \"importance\": 1,\n" + + " \"aggregation_query\": {\n" + + " \"filtered_max_1\": {\n" + + " \"filter\": {\n" + + " \"bool\": {\n" + + " \"must\": [\n" + + " {\n" + + " \"range\": {\n" + + " \"timestamp\": {\n" + + " \"lt\": %d\n" + + " }\n" + + " }\n" + + " }\n" + + " ]\n" + + " }\n" + + " },\n" + + " \"aggregations\": {\n" + + " \"max1\": {\n" + + " \"max\": {\n" + + " \"field\": \"transform._doc_count\"\n" + + " }\n" + + " }\n" + + " }\n" + + " }\n" + + " }\n" + + " }\n" + + " ],\n" + + " \"window_delay\": {\n" + + " \"period\": {\n" + + " \"interval\": %d,\n" + + " \"unit\": \"MINUTES\"\n" + + " }\n" + + " },\n" + + " \"ui_metadata\": {\n" + + " \"aabb\": {\n" + + " \"ab\": \"bb\"\n" + + " }\n" + + " },\n" + + " \"schema_version\": 2,\n" + + " \"horizon\": 24,\n" + + " \"forecast_interval\": {\n" + + " \"period\": {\n" + + " \"interval\": 10,\n" + + " \"unit\": \"MINUTES\"\n" + + " }\n" + + " }\n" + + "}"; + + // +1 to make sure it is big enough + windowDelayMinutes = Duration.between(trainTime, Instant.now()).toMinutes() + 1; + // we have 100 timestamps (2 entities per timestamp). Timestamps are 10 minutes apart. If we subtract 70 * 10 = 700 minutes, we have + // sparse data. + formattedForecaster = String + .format( + Locale.ROOT, + forecasterDef, + RULE_DATASET_NAME, + trainTime.minus(700, ChronoUnit.MINUTES).toEpochMilli(), + windowDelayMinutes + ); + response = TestHelpers + .makeRequest( + client(), + "POST", + String.format(Locale.ROOT, VALIDATE_FORECASTER_MODEL), + ImmutableMap.of(), + TestHelpers.toHttpEntity(formattedForecaster), + null + ); + assertEquals("Validate forecaster interval failed", RestStatus.OK, TestHelpers.restStatus(response)); + responseMap = entityAsMap(response); + validation = (Map) ((Map) responseMap.get("model")).get("feature_attributes"); + assertEquals( + "Data is most likely too sparse when given feature queries are applied. Consider revising feature queries: max1", + validation.get("message") + ); + assertEquals( + "Data is most likely too sparse when given feature queries are applied. Consider revising feature queries", + ((Map) validation.get("sub_issues")).get("max1") + ); + + // case 8: two features will fail + forecasterDef = "{\n" + + " \"name\": \"Second-Test-Forecaster-4\",\n" + + " \"description\": \"ok rate\",\n" + + " \"time_field\": \"timestamp\",\n" + + " \"indices\": [\n" + + " \"%s\"\n" + + " ],\n" + + " \"feature_attributes\": [\n" + + " {\n" + + " \"feature_id\": \"max1\",\n" + + " \"feature_name\": \"max1\",\n" + + " \"feature_enabled\": true,\n" + + " \"importance\": 1,\n" + + " \"aggregation_query\": {\n" + + " \"filtered_max_1\": {\n" + + " \"filter\": {\n" + + " \"bool\": {\n" + + " \"must\": [\n" + + " {\n" + + " \"range\": {\n" + + " \"timestamp\": {\n" + + " \"lt\": %d\n" + + " }\n" + + " }\n" + + " }\n" + + " ]\n" + + " }\n" + + " },\n" + + " \"aggregations\": {\n" + + " \"max1\": {\n" + + " \"max\": {\n" + + " \"field\": \"transform._doc_count\"\n" + + " }\n" + + " }\n" + + " }\n" + + " }\n" + + " }\n" + + " },\n" + + " {\n" + + " \"feature_id\": \"max2\",\n" + + " \"feature_name\": \"max2\",\n" + + " \"feature_enabled\": true,\n" + + " \"importance\": 1,\n" + + " \"aggregation_query\": {\n" + + " \"filtered_max_2\": {\n" + + " \"filter\": {\n" + + " \"bool\": {\n" + + " \"must\": [\n" + + " {\n" + + " \"range\": {\n" + + " \"timestamp\": {\n" + + " \"lt\": %d\n" + + " }\n" + + " }\n" + + " }\n" + + " ]\n" + + " }\n" + + " },\n" + + " \"aggregations\": {\n" + + " \"max2\": {\n" + + " \"max\": {\n" + + " \"field\": \"transform._doc_count\"\n" + + " }\n" + + " }\n" + + " }\n" + + " }\n" + + " }\n" + + " }\n" + + " ],\n" + + " \"window_delay\": {\n" + + " \"period\": {\n" + + " \"interval\": %d,\n" + + " \"unit\": \"MINUTES\"\n" + + " }\n" + + " },\n" + + " \"ui_metadata\": {\n" + + " \"aabb\": {\n" + + " \"ab\": \"bb\"\n" + + " }\n" + + " },\n" + + " \"schema_version\": 2,\n" + + " \"horizon\": 24,\n" + + " \"forecast_interval\": {\n" + + " \"period\": {\n" + + " \"interval\": 10,\n" + + " \"unit\": \"MINUTES\"\n" + + " }\n" + + " }\n" + + "}"; + + // +1 to make sure it is big enough + windowDelayMinutes = Duration.between(trainTime, Instant.now()).toMinutes() + 1; + // we have 100 timestamps (2 entities per timestamp). Timestamps are 10 minutes apart. If we subtract 70 * 10 = 700 minutes, we have + // sparse data. + long filterTimestamp = trainTime.minus(700, ChronoUnit.MINUTES).toEpochMilli(); + formattedForecaster = String + .format(Locale.ROOT, forecasterDef, RULE_DATASET_NAME, filterTimestamp, filterTimestamp, windowDelayMinutes); + response = TestHelpers + .makeRequest( + client(), + "POST", + String.format(Locale.ROOT, VALIDATE_FORECASTER_MODEL), + ImmutableMap.of(), + TestHelpers.toHttpEntity(formattedForecaster), + null + ); + assertEquals("Validate forecaster interval failed", RestStatus.OK, TestHelpers.restStatus(response)); + responseMap = entityAsMap(response); + validation = (Map) ((Map) responseMap.get("forecaster")).get("feature_attributes"); + assertEquals("Can't create more than 1 feature(s)", validation.get("message")); + } + + public void testValidateHC() throws Exception { + Instant trainTime = loadSparseCategoryData(82); + // add 2 to be safe + long windowDelayMinutes = Duration.between(trainTime, Instant.now()).toMinutes() + 2; + + // case 1: index does not exist + String forecasterDef = "{\n" + + " \"name\": \"Second-Test-Forecaster-4\",\n" + + " \"description\": \"ok rate\",\n" + + " \"time_field\": \"timestamp\",\n" + + " \"indices\": [\n" + + " \"%s\"\n" + + " ],\n" + + " \"feature_attributes\": [\n" + + " {\n" + + " \"feature_id\": \"sum1\",\n" + + " \"feature_name\": \"sum1\",\n" + + " \"feature_enabled\": true,\n" + + " \"importance\": 1,\n" + + " \"aggregation_query\": {\n" + + " \"sum1\": {\n" + + " \"sum\": {\n" + + " \"field\": \"transform._doc_count\"\n" + + " }\n" + + " }\n" + + " }\n" + + " }\n" + + " ],\n" + + " \"window_delay\": {\n" + + " \"period\": {\n" + + " \"interval\": 20,\n" + + " \"unit\": \"SECONDS\"\n" + + " }\n" + + " },\n" + + " \"ui_metadata\": {\n" + + " \"aabb\": {\n" + + " \"ab\": \"bb\"\n" + + " }\n" + + " },\n" + + " \"schema_version\": 2,\n" + + " \"horizon\": 24,\n" + + " \"forecast_interval\": {\n" + + " \"period\": {\n" + + " \"interval\": 10,\n" + + " \"unit\": \"MINUTES\"\n" + + " }\n" + + " },\n" + + " \"category_field\": [\"%s\"]\n" + + "}"; + + String formattedForecaster = String + .format(Locale.ROOT, forecasterDef, SYNTHETIC_DATASET_NAME, windowDelayMinutes, "component1Name,component2Name"); + + Response response = TestHelpers + .makeRequest( + client(), + "POST", + String.format(Locale.ROOT, VALIDATE_FORECASTER_MODEL), + ImmutableMap.of(), + TestHelpers.toHttpEntity(formattedForecaster), + null + ); + assertEquals("Validate forecaster model failed", RestStatus.OK, TestHelpers.restStatus(response)); + Map responseMap = entityAsMap(response); + Map validations = (Map) ((Map) responseMap.get("forecaster")).get("indices"); + assertEquals("index does not exist", validations.get("message")); + + // case 2: invalid categorical field + forecasterDef = "{\n" + + " \"name\": \"Second-Test-Forecaster-4\",\n" + + " \"description\": \"ok rate\",\n" + + " \"time_field\": \"timestamp\",\n" + + " \"indices\": [\n" + + " \"%s\"\n" + + " ],\n" + + " \"feature_attributes\": [\n" + + " {\n" + + " \"feature_id\": \"sum1\",\n" + + " \"feature_name\": \"sum1\",\n" + + " \"feature_enabled\": true,\n" + + " \"importance\": 1,\n" + + " \"aggregation_query\": {\n" + + " \"sum1\": {\n" + + " \"sum\": {\n" + + " \"field\": \"transform._doc_count\"\n" + + " }\n" + + " }\n" + + " }\n" + + " }\n" + + " ],\n" + + " \"window_delay\": {\n" + + " \"period\": {\n" + + " \"interval\": %d,\n" + + " \"unit\": \"MINUTES\"\n" + + " }\n" + + " },\n" + + " \"ui_metadata\": {\n" + + " \"aabb\": {\n" + + " \"ab\": \"bb\"\n" + + " }\n" + + " },\n" + + " \"schema_version\": 2,\n" + + " \"horizon\": 24,\n" + + " \"forecast_interval\": {\n" + + " \"period\": {\n" + + " \"interval\": 10,\n" + + " \"unit\": \"MINUTES\"\n" + + " }\n" + + " },\n" + + " \"category_field\": [\"%s\"]\n" + + "}"; + + formattedForecaster = String.format(Locale.ROOT, forecasterDef, RULE_DATASET_NAME, windowDelayMinutes, "476465"); + + response = TestHelpers + .makeRequest( + client(), + "POST", + String.format(Locale.ROOT, VALIDATE_FORECASTER_MODEL), + ImmutableMap.of(), + TestHelpers.toHttpEntity(formattedForecaster), + null + ); + assertEquals("Validate forecaster model failed", RestStatus.OK, TestHelpers.restStatus(response)); + responseMap = entityAsMap(response); + validations = (Map) ((Map) responseMap.get("forecaster")).get("category_field"); + assertEquals("Can't find the categorical field 476465", validations.get("message")); + + // case 3: validate data sparsity with one categorical field + forecasterDef = "{\n" + + " \"name\": \"Second-Test-Forecaster-4\",\n" + + " \"description\": \"ok rate\",\n" + + " \"time_field\": \"timestamp\",\n" + + " \"indices\": [\n" + + " \"%s\"\n" + + " ],\n" + + " \"feature_attributes\": [\n" + + " {\n" + + " \"feature_id\": \"sum1\",\n" + + " \"feature_name\": \"sum1\",\n" + + " \"feature_enabled\": true,\n" + + " \"importance\": 1,\n" + + " \"aggregation_query\": {\n" + + " \"sum1\": {\n" + + " \"sum\": {\n" + + " \"field\": \"transform._doc_count\"\n" + + " }\n" + + " }\n" + + " }\n" + + " }\n" + + " ],\n" + + " \"window_delay\": {\n" + + " \"period\": {\n" + + " \"interval\": %d,\n" + + " \"unit\": \"MINUTES\"\n" + + " }\n" + + " },\n" + + " \"ui_metadata\": {\n" + + " \"aabb\": {\n" + + " \"ab\": \"bb\"\n" + + " }\n" + + " },\n" + + " \"schema_version\": 2,\n" + + " \"horizon\": 24,\n" + + " \"forecast_interval\": {\n" + + " \"period\": {\n" + + " \"interval\": 10,\n" + + " \"unit\": \"MINUTES\"\n" + + " }\n" + + " },\n" + + " \"category_field\": [\"%s\"]\n" + + "}"; + + formattedForecaster = String.format(Locale.ROOT, forecasterDef, RULE_DATASET_NAME, windowDelayMinutes, "component1Name"); + + response = TestHelpers + .makeRequest( + client(), + "POST", + String.format(Locale.ROOT, VALIDATE_FORECASTER_MODEL), + ImmutableMap.of(), + TestHelpers.toHttpEntity(formattedForecaster), + null + ); + assertEquals("Validate forecaster model failed", RestStatus.OK, TestHelpers.restStatus(response)); + responseMap = entityAsMap(response); + validations = (Map) ((Map) responseMap.get("model")).get("category_field"); + assertEquals( + "Data is most likely too sparse with the given category fields. Consider revising category field/s or ingesting more data.", + validations.get("message") + ); + + // case 4: validate data sparsity with two categorical fields + forecasterDef = "{\n" + + " \"name\": \"Second-Test-Forecaster-4\",\n" + + " \"description\": \"ok rate\",\n" + + " \"time_field\": \"timestamp\",\n" + + " \"indices\": [\n" + + " \"%s\"\n" + + " ],\n" + + " \"feature_attributes\": [\n" + + " {\n" + + " \"feature_id\": \"sum1\",\n" + + " \"feature_name\": \"sum1\",\n" + + " \"feature_enabled\": true,\n" + + " \"importance\": 1,\n" + + " \"aggregation_query\": {\n" + + " \"sum1\": {\n" + + " \"sum\": {\n" + + " \"field\": \"transform._doc_count\"\n" + + " }\n" + + " }\n" + + " }\n" + + " }\n" + + " ],\n" + + " \"window_delay\": {\n" + + " \"period\": {\n" + + " \"interval\": %d,\n" + + " \"unit\": \"MINUTES\"\n" + + " }\n" + + " },\n" + + " \"ui_metadata\": {\n" + + " \"aabb\": {\n" + + " \"ab\": \"bb\"\n" + + " }\n" + + " },\n" + + " \"schema_version\": 2,\n" + + " \"horizon\": 24,\n" + + " \"forecast_interval\": {\n" + + " \"period\": {\n" + + " \"interval\": 10,\n" + + " \"unit\": \"MINUTES\"\n" + + " }\n" + + " },\n" + + " \"category_field\": [\"%s\", \"%s\"]\n" + + "}"; + + formattedForecaster = String + .format(Locale.ROOT, forecasterDef, RULE_DATASET_NAME, windowDelayMinutes, "component1Name", "component2Name"); + + response = TestHelpers + .makeRequest( + client(), + "POST", + String.format(Locale.ROOT, VALIDATE_FORECASTER_MODEL), + ImmutableMap.of(), + TestHelpers.toHttpEntity(formattedForecaster), + null + ); + assertEquals("Validate forecaster model failed", RestStatus.OK, TestHelpers.restStatus(response)); + responseMap = entityAsMap(response); + validations = (Map) ((Map) responseMap.get("model")).get("category_field"); + assertEquals( + "Data is most likely too sparse with the given category fields. Consider revising category field/s or ingesting more data.", + validations.get("message") + ); + } + + public void testCreate() throws Exception { + Instant trainTime = loadRuleData(200); + // case 1: create two features will fail + String forecasterDef = "{\n" + + " \"name\": \"Second-Test-Forecaster-4\",\n" + + " \"description\": \"ok rate\",\n" + + " \"time_field\": \"timestamp\",\n" + + " \"indices\": [\n" + + " \"%s\"\n" + + " ],\n" + + " \"feature_attributes\": [\n" + + " {\n" + + " \"feature_id\": \"max1\",\n" + + " \"feature_name\": \"max1\",\n" + + " \"feature_enabled\": true,\n" + + " \"importance\": 1,\n" + + " \"aggregation_query\": {\n" + + " \"filtered_max_1\": {\n" + + " \"filter\": {\n" + + " \"bool\": {\n" + + " \"must\": [\n" + + " {\n" + + " \"range\": {\n" + + " \"timestamp\": {\n" + + " \"lt\": %d\n" + + " }\n" + + " }\n" + + " }\n" + + " ]\n" + + " }\n" + + " },\n" + + " \"aggregations\": {\n" + + " \"max1\": {\n" + + " \"max\": {\n" + + " \"field\": \"transform._doc_count\"\n" + + " }\n" + + " }\n" + + " }\n" + + " }\n" + + " }\n" + + " },\n" + + " {\n" + + " \"feature_id\": \"max2\",\n" + + " \"feature_name\": \"max2\",\n" + + " \"feature_enabled\": true,\n" + + " \"importance\": 1,\n" + + " \"aggregation_query\": {\n" + + " \"filtered_max_2\": {\n" + + " \"filter\": {\n" + + " \"bool\": {\n" + + " \"must\": [\n" + + " {\n" + + " \"range\": {\n" + + " \"timestamp\": {\n" + + " \"lt\": %d\n" + + " }\n" + + " }\n" + + " }\n" + + " ]\n" + + " }\n" + + " },\n" + + " \"aggregations\": {\n" + + " \"max2\": {\n" + + " \"max\": {\n" + + " \"field\": \"transform._doc_count\"\n" + + " }\n" + + " }\n" + + " }\n" + + " }\n" + + " }\n" + + " }\n" + + " ],\n" + + " \"window_delay\": {\n" + + " \"period\": {\n" + + " \"interval\": %d,\n" + + " \"unit\": \"MINUTES\"\n" + + " }\n" + + " },\n" + + " \"ui_metadata\": {\n" + + " \"aabb\": {\n" + + " \"ab\": \"bb\"\n" + + " }\n" + + " },\n" + + " \"schema_version\": 2,\n" + + " \"horizon\": 24,\n" + + " \"forecast_interval\": {\n" + + " \"period\": {\n" + + " \"interval\": 10,\n" + + " \"unit\": \"MINUTES\"\n" + + " }\n" + + " }\n" + + "}"; + + // +1 to make sure it is big enough + long windowDelayMinutes = Duration.between(trainTime, Instant.now()).toMinutes() + 1; + // we have 100 timestamps (2 entities per timestamp). Timestamps are 10 minutes apart. If we subtract 70 * 10 = 700 minutes, we have + // sparse data. + long filterTimestamp = trainTime.minus(700, ChronoUnit.MINUTES).toEpochMilli(); + final String formattedForecaster = String + .format(Locale.ROOT, forecasterDef, RULE_DATASET_NAME, filterTimestamp, filterTimestamp, windowDelayMinutes); + Exception ex = expectThrows( + ResponseException.class, + () -> TestHelpers + .makeRequest( + client(), + "POST", + String.format(Locale.ROOT, CREATE_FORECASTER),// VALIDATE_FORECASTER_MODEL), + ImmutableMap.of(), + TestHelpers.toHttpEntity(formattedForecaster), + null + ) + ); + MatcherAssert.assertThat(ex.getMessage(), containsString("Can't create more than 1 feature(s)")); + + // case 2: create forecaster with custom index + forecasterDef = "{\n" + + " \"name\": \"Second-Test-Forecaster-4\",\n" + + " \"description\": \"ok rate\",\n" + + " \"time_field\": \"timestamp\",\n" + + " \"indices\": [\n" + + " \"%s\"\n" + + " ],\n" + + " \"feature_attributes\": [\n" + + " {\n" + + " \"feature_id\": \"max1\",\n" + + " \"feature_name\": \"max1\",\n" + + " \"feature_enabled\": true,\n" + + " \"importance\": 1,\n" + + " \"aggregation_query\": {\n" + + " \"filtered_max_1\": {\n" + + " \"filter\": {\n" + + " \"bool\": {\n" + + " \"must\": [\n" + + " {\n" + + " \"range\": {\n" + + " \"timestamp\": {\n" + + " \"lt\": %d\n" + + " }\n" + + " }\n" + + " }\n" + + " ]\n" + + " }\n" + + " },\n" + + " \"aggregations\": {\n" + + " \"max1\": {\n" + + " \"max\": {\n" + + " \"field\": \"transform._doc_count\"\n" + + " }\n" + + " }\n" + + " }\n" + + " }\n" + + " }\n" + + " }\n" + + " ],\n" + + " \"window_delay\": {\n" + + " \"period\": {\n" + + " \"interval\": %d,\n" + + " \"unit\": \"MINUTES\"\n" + + " }\n" + + " },\n" + + " \"ui_metadata\": {\n" + + " \"aabb\": {\n" + + " \"ab\": \"bb\"\n" + + " }\n" + + " },\n" + + " \"schema_version\": 2,\n" + + " \"horizon\": 24,\n" + + " \"forecast_interval\": {\n" + + " \"period\": {\n" + + " \"interval\": 10,\n" + + " \"unit\": \"MINUTES\"\n" + + " }\n" + + " },\n" + + " \"result_index\": \"opensearch-forecast-result-b\"\n" + + "}"; + + // +1 to make sure it is big enough + windowDelayMinutes = Duration.between(trainTime, Instant.now()).toMinutes() + 1; + // we have 100 timestamps (2 entities per timestamp). Timestamps are 10 minutes apart. If we subtract 70 * 10 = 700 minutes, we have + // sparse data. + String formattedForecaster2 = String.format(Locale.ROOT, forecasterDef, RULE_DATASET_NAME, filterTimestamp, windowDelayMinutes); + Response response = TestHelpers + .makeRequest( + client(), + "POST", + String.format(Locale.ROOT, CREATE_FORECASTER), + ImmutableMap.of(), + TestHelpers.toHttpEntity(formattedForecaster2), + null + ); + Map responseMap = entityAsMap(response); + assertEquals("opensearch-forecast-result-b", ((Map) responseMap.get("forecaster")).get("result_index")); + } } diff --git a/src/test/java/org/opensearch/timeseries/AbstractSyntheticDataTest.java b/src/test/java/org/opensearch/timeseries/AbstractSyntheticDataTest.java index 929670d2b..c10ada512 100644 --- a/src/test/java/org/opensearch/timeseries/AbstractSyntheticDataTest.java +++ b/src/test/java/org/opensearch/timeseries/AbstractSyntheticDataTest.java @@ -7,9 +7,14 @@ import static org.opensearch.timeseries.TestHelpers.toHttpEntity; +import java.io.File; +import java.io.FileReader; import java.io.IOException; import java.io.InputStreamReader; +import java.net.URISyntaxException; import java.nio.charset.Charset; +import java.time.Instant; +import java.util.ArrayList; import java.util.List; import java.util.Locale; @@ -27,12 +32,23 @@ import org.opensearch.timeseries.settings.TimeSeriesSettings; import com.google.common.collect.ImmutableList; +import com.google.gson.Gson; import com.google.gson.JsonArray; import com.google.gson.JsonObject; import com.google.gson.JsonParser; +import com.google.gson.stream.JsonReader; public class AbstractSyntheticDataTest extends ODFERestTestCase { public static final Logger LOG = (Logger) LogManager.getLogger(AbstractSyntheticDataTest.class); + public static final String SYNTHETIC_DATA_MAPPING = "{ \"mappings\": { \"properties\": { \"timestamp\": { \"type\": \"date\"}," + + " \"Feature1\": { \"type\": \"double\" }, \"Feature2\": { \"type\": \"double\" } } } }"; + public static final String RULE_DATA_MAPPING = "{ \"mappings\": { \"properties\": { \"timestamp\": { \"type\":" + + "\"date\"" + + "}," + + " \"transform._doc_count\": { \"type\": \"integer\" }," + + "\"componentName\": { \"type\": \"keyword\"} } } }"; + public static final String SYNTHETIC_DATASET_NAME = "synthetic"; + public static final String RULE_DATASET_NAME = "rule"; /** * In real time AD, we mute a node for a detector if that node keeps returning @@ -154,4 +170,81 @@ public static void setWarningHandler(Request request, boolean strictDeprecationM request.setOptions(options.build()); } + /** + * Read data from a json array file up to a specified size + * @param datasetFileName data set file name + * @param size the limit of json elements to read + * @return the read JsonObject list + * @throws URISyntaxException when failing to find datasetFileName + * @throws Exception when there is a parsing error. + */ + public static List readJsonArrayWithLimit(String datasetFileName, int limit) throws URISyntaxException { + List jsonObjects = new ArrayList<>(); + try ( + FileReader fileReader = new FileReader( + new File(AbstractSyntheticDataTest.class.getClassLoader().getResource(datasetFileName).toURI()), + Charset.defaultCharset() + ); + JsonReader jsonReader = new JsonReader(fileReader) + ) { + + Gson gson = new Gson(); + JsonArray jsonArray = gson.fromJson(jsonReader, JsonArray.class); + + for (int i = 0; i < limit && i < jsonArray.size(); i++) { + JsonObject jsonObject = jsonArray.get(i).getAsJsonObject(); + jsonObjects.add(jsonObject); + } + + } catch (IOException e) { + LOG.error("fail to read json array", e); + } + return jsonObjects; + } + + /** + * + * @param datasetName Data set name + * @param trainTestSplit the number of rows in training data + * @return train time + * @throws Exception when failing to ingest data + */ + private static Instant loadData(String datasetName, int trainTestSplit, String mapping) throws Exception { + RestClient client = client(); + + String dataFileName = String.format(Locale.ROOT, "org/opensearch/ad/e2e/data/%s.data", datasetName); + + List data = readJsonArrayWithLimit(dataFileName, trainTestSplit); + + bulkIndexTrainData(datasetName, data, trainTestSplit, client, mapping); + String trainTimeStr = data.get(trainTestSplit - 1).get("timestamp").getAsString(); + if (canBeParsedAsLong(trainTimeStr)) { + return Instant.ofEpochMilli(Long.parseLong(trainTimeStr)); + } else { + return Instant.parse(trainTimeStr); + } + + } + + protected static Instant loadSyntheticData(int trainTestSplit) throws Exception { + return loadData(SYNTHETIC_DATASET_NAME, trainTestSplit, SYNTHETIC_DATA_MAPPING); + } + + protected static Instant loadRuleData(int trainTestSplit) throws Exception { + return loadData(RULE_DATASET_NAME, trainTestSplit, RULE_DATA_MAPPING); + } + + public static boolean canBeParsedAsLong(String str) { + if (str == null || str.isEmpty()) { + return false; // Handle null or empty strings as not parsable + } + + try { + Long.parseLong(str); + return true; + } catch (NumberFormatException e) { + return false; + } + } + } diff --git a/src/test/java/org/opensearch/timeseries/feature/NoPowermockSearchFeatureDaoTests.java b/src/test/java/org/opensearch/timeseries/feature/NoPowermockSearchFeatureDaoTests.java index 40f3ae359..47118ca8a 100644 --- a/src/test/java/org/opensearch/timeseries/feature/NoPowermockSearchFeatureDaoTests.java +++ b/src/test/java/org/opensearch/timeseries/feature/NoPowermockSearchFeatureDaoTests.java @@ -184,7 +184,6 @@ public void setUp() throws Exception { client, xContentRegistry(), // Important. Without this, ParseUtils cannot parse anything clientUtil, - settings, clusterService, TimeSeriesSettings.NUM_SAMPLES_PER_TREE, clock, @@ -370,7 +369,6 @@ public void testGetHighestCountEntitiesExhaustedPages() throws InterruptedExcept client, xContentRegistry(), clientUtil, - settings, clusterService, TimeSeriesSettings.NUM_SAMPLES_PER_TREE, clock, @@ -415,7 +413,6 @@ public void testGetHighestCountEntitiesNotEnoughTime() throws InterruptedExcepti client, xContentRegistry(), clientUtil, - settings, clusterService, TimeSeriesSettings.NUM_SAMPLES_PER_TREE, clock, diff --git a/src/test/java/org/opensearch/timeseries/indices/rest/handler/HistogramAggregationHelperTests.java b/src/test/java/org/opensearch/timeseries/indices/rest/handler/HistogramAggregationHelperTests.java new file mode 100644 index 000000000..310fb64b9 --- /dev/null +++ b/src/test/java/org/opensearch/timeseries/indices/rest/handler/HistogramAggregationHelperTests.java @@ -0,0 +1,109 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.timeseries.indices.rest.handler; + +import static org.mockito.Mockito.*; + +import java.util.Arrays; +import java.util.Collections; +import java.util.List; + +import org.junit.jupiter.api.BeforeEach; +import org.mockito.Mock; +import org.mockito.MockitoAnnotations; +import org.opensearch.action.search.SearchResponse; +import org.opensearch.common.unit.TimeValue; +import org.opensearch.core.rest.RestStatus; +import org.opensearch.search.aggregations.Aggregations; +import org.opensearch.search.aggregations.bucket.histogram.Histogram; +import org.opensearch.test.OpenSearchTestCase; +import org.opensearch.timeseries.common.exception.ValidationException; +import org.opensearch.timeseries.constant.CommonMessages; +import org.opensearch.timeseries.feature.SearchFeatureDao; +import org.opensearch.timeseries.model.Config; +import org.opensearch.timeseries.model.ValidationAspect; +import org.opensearch.timeseries.model.ValidationIssueType; +import org.opensearch.timeseries.rest.handler.AggregationPrep; + +public class HistogramAggregationHelperTests extends OpenSearchTestCase { + + @Mock + private Config mockConfig; + + @Mock + private SearchResponse mockSearchResponse; + + @Mock + private Aggregations mockAggregations; + + private AggregationPrep histogramAggregationHelper; + + @Mock + private SearchFeatureDao searchFeatureDao; + + @Override + @BeforeEach + public void setUp() throws Exception { + super.setUp(); + MockitoAnnotations.openMocks(this); + TimeValue requestTimeout = TimeValue.timeValueMinutes(1); + histogramAggregationHelper = new AggregationPrep(searchFeatureDao, requestTimeout, mockConfig); + } + + public void testCheckBucketResultErrors_NullAggregations() { + when(mockSearchResponse.getAggregations()).thenReturn(null); + + ValidationException exception = assertThrows(ValidationException.class, () -> { + histogramAggregationHelper.validateAndRetrieveHistogramAggregation(mockSearchResponse); + }); + + verify(mockSearchResponse).getAggregations(); + assert(exception.getMessage().contains(CommonMessages.MODEL_VALIDATION_FAILED_UNEXPECTEDLY)); + assert(exception.getType() == ValidationIssueType.AGGREGATION); + assert(exception.getAspect() == ValidationAspect.MODEL); + } + + public void testCheckBucketResultErrors_NullAggregationResult() { + when(mockSearchResponse.getAggregations()).thenReturn(mockAggregations); + Histogram histogram = mock(Histogram.class); + when(histogram.getName()).thenReturn("blah"); + + List histogramBuckets = Arrays.asList(); + when((List)histogram.getBuckets()).thenReturn(histogramBuckets); + + Aggregations searchAggs = new Aggregations(Collections.singletonList(histogram)); + + SearchResponse searchResponse = mock(SearchResponse.class); + when(searchResponse.status()).thenReturn(RestStatus.OK); + when(searchResponse.getScrollId()).thenReturn(randomAlphaOfLength(1000)); + when(searchResponse.getAggregations()).thenReturn(searchAggs); + when(searchResponse.getTook()).thenReturn(TimeValue.timeValueMillis(randomNonNegativeLong())); + + IllegalArgumentException exception = assertThrows(IllegalArgumentException.class, () -> { + histogramAggregationHelper.validateAndRetrieveHistogramAggregation(searchResponse); + }); + + assert(exception.getMessage().contains("Failed to find valid aggregation result")); + } + + public void testConvertKeyToEpochMillis_Double() { + Double key = 1234567890.0; + long expected = 1234567890L; + + long result = AggregationPrep.convertKeyToEpochMillis(key); + + assertEquals("The conversion from Double to long epoch millis is incorrect", expected, result); + } + + public void testConvertKeyToEpochMillis_Long() { + Long key = 1234567890L; + long expected = 1234567890L; + + long result = AggregationPrep.convertKeyToEpochMillis(key); + + assertEquals("The conversion from Long to long epoch millis is incorrect", expected, result); + } +}