diff --git a/.github/workflows/benchmark.yml b/.github/workflows/benchmark.yml index 9c5797f1d..8a16b0790 100644 --- a/.github/workflows/benchmark.yml +++ b/.github/workflows/benchmark.yml @@ -58,12 +58,12 @@ jobs: su `id -un 1000` -c "./gradlew ':test' --tests 'org.opensearch.ad.ml.HCADModelPerfTests' \ -Dtests.seed=2AEBDBBAE75AC5E0 -Dtests.security.manager=false \ -Dtests.locale=es-CU -Dtests.timezone=Chile/EasterIsland -Dtest.logs=true \ - -Dmodel-benchmark=true" + -Dtests.timeoutSuite=3600000! -Dmodel-benchmark=true" ;; single_stream) su `id -un 1000` -c "./gradlew integTest --tests 'org.opensearch.ad.e2e.SingleStreamModelPerfIT' \ -Dtests.seed=60CDDB34427ACD0C -Dtests.security.manager=false \ -Dtests.locale=kab-DZ -Dtests.timezone=Asia/Hebron -Dtest.logs=true \ - -Dmodel-benchmark=true" + -Dtests.timeoutSuite=3600000! -Dmodel-benchmark=true" ;; esac diff --git a/.github/workflows/test_security.yml b/.github/workflows/test_security.yml index 935b6d166..1f9ce669b 100644 --- a/.github/workflows/test_security.yml +++ b/.github/workflows/test_security.yml @@ -37,9 +37,9 @@ jobs: - name: Pull and Run Docker run: | plugin=`basename $(ls build/distributions/*.zip)` - version=`echo $plugin|awk -F- '{print $5}'| cut -d. -f 1-3` - plugin_version=`echo $plugin|awk -F- '{print $5}'| cut -d. -f 1-4` - qualifier=`echo $plugin|awk -F- '{print $6}'| cut -d. -f 1-1` + version=`echo $plugin|awk -F- '{print $4}'| cut -d. -f 1-3` + plugin_version=`echo $plugin|awk -F- '{print $4}'| cut -d. -f 1-4` + qualifier=`echo $plugin|awk -F- '{print $5}'| cut -d. -f 1-1` if $qualifier!=SNAPSHOT then diff --git a/build-tools/coverage.gradle b/build-tools/coverage.gradle new file mode 100644 index 000000000..eb3582dab --- /dev/null +++ b/build-tools/coverage.gradle @@ -0,0 +1,49 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +apply plugin: 'jacoco' + +jacoco { + toolVersion = "0.8.10" +} + +/** + * This code sets up coverage reporting manually for the k-NN plugin tests. This is complicated because: + * 1. The OS integTest Task doesn't implement Gradle's JavaForkOptions so we have to manually start the jacoco agent with the test JVM + * 2. The cluster nodes are stopped using 'kill -9' which means jacoco can't dump it's execution output to a file on VM shutdown + * 3. The Java Security Manager prevents JMX from writing execution output to the file. + * + * To workaround these we start the cluster with jmx enabled and then use Jacoco's JMX MBean to get the execution data before the + * cluster is stopped and dump it to a file. Luckily our current security policy seems to allow this. This will also probably + * break if there are multiple nodes in the integTestCluster. But for now... it sorta works. + */ +integTest { + jacoco { + jmx = true + } + + systemProperty 'jacoco.dir', project.layout.buildDirectory.get().file("jacoco").asFile.absolutePath + systemProperty 'jmx.serviceUrl', "service:jmx:rmi:///jndi/rmi://127.0.0.1:7777/jmxrmi" +} + +jacocoTestReport { + dependsOn integTest, test + executionData.from = [integTest.jacoco.destinationFile, test.jacoco.destinationFile] + reports { + html.getRequired().set(true) // human readable + csv.getRequired().set(true) + xml.getRequired().set(true) // for coverlay + } +} + +testClusters.integTest { + jvmArgs " ${integTest.jacoco.getAsJvmArg()}" + + systemProperty 'com.sun.management.jmxremote', "true" + systemProperty 'com.sun.management.jmxremote.authenticate', "false" + systemProperty 'com.sun.management.jmxremote.port', "7777" + systemProperty 'com.sun.management.jmxremote.ssl', "false" + systemProperty 'java.rmi.server.hostname', "127.0.0.1" +} diff --git a/build.gradle b/build.gradle index 052e0f536..b31561343 100644 --- a/build.gradle +++ b/build.gradle @@ -35,7 +35,7 @@ buildscript { js_resource_folder = "src/test/resources/job-scheduler" common_utils_version = System.getProperty("common_utils.version", opensearch_build) job_scheduler_version = System.getProperty("job_scheduler.version", opensearch_build) - bwcVersionShort = "2.15.0" + bwcVersionShort = "2.16.0" bwcVersion = bwcVersionShort + ".0" bwcOpenSearchADDownload = 'https://ci.opensearch.org/ci/dbc/distribution-build-opensearch/' + bwcVersionShort + '/latest/linux/x64/tar/builds/' + 'opensearch/plugins/opensearch-anomaly-detection-' + bwcVersion + '.zip' @@ -662,6 +662,13 @@ task release(type: Copy, group: 'build') { eachFile { it.path = it.path - "opensearch/" } } +def usingRemoteCluster = System.properties.containsKey('tests.rest.cluster') || System.properties.containsKey('tests.cluster') +def usingMultiNode = project.properties.containsKey('numNodes') +// Only apply jacoco test coverage if we are running a local single node cluster +if (!usingRemoteCluster && !usingMultiNode) { + apply from: 'build-tools/coverage.gradle' +} + List jacocoExclusions = [ // code for configuration, settings, etc is excluded from coverage 'org.opensearch.timeseries.TimeSeriesAnalyticsPlugin', @@ -701,6 +708,8 @@ List jacocoExclusions = [ jacocoTestCoverageVerification { + dependsOn(jacocoTestReport) + executionData.from = [integTest.jacoco.destinationFile, test.jacoco.destinationFile] violationRules { rule { element = 'CLASS' @@ -722,13 +731,6 @@ jacocoTestCoverageVerification { } } -jacocoTestReport { - reports { - xml.required = true // for coverlay - html.required = true // human readable - } -} - check.dependsOn jacocoTestCoverageVerification jacocoTestCoverageVerification.dependsOn jacocoTestReport diff --git a/dataGeneration/requirements.txt b/dataGeneration/requirements.txt index 54f597d98..11750d9ca 100644 --- a/dataGeneration/requirements.txt +++ b/dataGeneration/requirements.txt @@ -4,4 +4,4 @@ numpy==1.23.0 opensearch_py==2.0.0 retry==0.9.2 scipy==1.10.0 -urllib3==1.26.18 \ No newline at end of file +urllib3==1.26.19 \ No newline at end of file diff --git a/src/main/java/org/opensearch/forecast/settings/ForecastEnabledSetting.java b/src/main/java/org/opensearch/forecast/settings/ForecastEnabledSetting.java index b1635f9d3..4984b822d 100644 --- a/src/main/java/org/opensearch/forecast/settings/ForecastEnabledSetting.java +++ b/src/main/java/org/opensearch/forecast/settings/ForecastEnabledSetting.java @@ -27,7 +27,8 @@ public class ForecastEnabledSetting extends DynamicNumericSetting { */ public static final String FORECAST_ENABLED = "plugins.forecast.enabled"; - public static final boolean enabled = false; + // TODO: remove the field when releasing forecasting + public static boolean enabled = false; public static final Map> settings = unmodifiableMap(new HashMap>() { { @@ -55,8 +56,10 @@ public static synchronized ForecastEnabledSetting getInstance() { * @return whether forecasting is enabled. */ public static boolean isForecastEnabled() { - // return ForecastEnabledSetting.getInstance().getSettingValue(ForecastEnabledSetting.FORECAST_ENABLED); + // keep the dynamic setting in main branch to enable writing tests. + // will make it hardcoded false in the 2.x branch. + return ForecastEnabledSetting.getInstance().getSettingValue(ForecastEnabledSetting.FORECAST_ENABLED); // TODO: enable forecasting before released - return enabled; + // return enabled; } } diff --git a/src/main/java/org/opensearch/timeseries/JobProcessor.java b/src/main/java/org/opensearch/timeseries/JobProcessor.java index 03a555845..f9b4863e9 100644 --- a/src/main/java/org/opensearch/timeseries/JobProcessor.java +++ b/src/main/java/org/opensearch/timeseries/JobProcessor.java @@ -579,5 +579,9 @@ private void releaseLock(Job jobParameter, LockService lockService, LockModel lo ); } + public Integer getEndRunExceptionCount(String configId) { + return endRunExceptionCount.getOrDefault(configId, 0); + } + protected abstract ResultRequest createResultRequest(String configID, long start, long end); } diff --git a/src/main/java/org/opensearch/timeseries/util/TaskUtil.java b/src/main/java/org/opensearch/timeseries/util/TaskUtil.java deleted file mode 100644 index a92d81043..000000000 --- a/src/main/java/org/opensearch/timeseries/util/TaskUtil.java +++ /dev/null @@ -1,41 +0,0 @@ -/* - * Copyright OpenSearch Contributors - * SPDX-License-Identifier: Apache-2.0 - */ - -package org.opensearch.timeseries.util; - -import static org.opensearch.ad.model.ADTaskType.ALL_HISTORICAL_TASK_TYPES; -import static org.opensearch.ad.model.ADTaskType.HISTORICAL_DETECTOR_TASK_TYPES; -import static org.opensearch.ad.model.ADTaskType.REALTIME_TASK_TYPES; - -import java.util.List; - -import org.opensearch.forecast.model.ForecastTaskType; -import org.opensearch.timeseries.AnalysisType; -import org.opensearch.timeseries.model.DateRange; -import org.opensearch.timeseries.model.TaskType; - -public class TaskUtil { - public static List getTaskTypes(DateRange dateRange, boolean resetLatestTaskStateFlag, AnalysisType analysisType) { - if (analysisType == AnalysisType.FORECAST) { - if (dateRange == null) { - return ForecastTaskType.REALTIME_TASK_TYPES; - } else { - throw new UnsupportedOperationException("Forecasting does not support historical tasks"); - } - } else { - if (dateRange == null) { - return REALTIME_TASK_TYPES; - } else { - if (resetLatestTaskStateFlag) { - // return all task types include HC entity task to make sure we can reset all tasks latest flag - return ALL_HISTORICAL_TASK_TYPES; - } else { - return HISTORICAL_DETECTOR_TASK_TYPES; - } - } - } - - } -} diff --git a/src/test/java/org/opensearch/action/admin/indices/mapping/get/IndexAnomalyDetectorActionHandlerTests.java b/src/test/java/org/opensearch/action/admin/indices/mapping/get/IndexAnomalyDetectorActionHandlerTests.java index 9679c1ce1..e1e86e1ec 100644 --- a/src/test/java/org/opensearch/action/admin/indices/mapping/get/IndexAnomalyDetectorActionHandlerTests.java +++ b/src/test/java/org/opensearch/action/admin/indices/mapping/get/IndexAnomalyDetectorActionHandlerTests.java @@ -122,8 +122,6 @@ public void setUp() throws Exception { clientUtil = new SecurityClientUtil(nodeStateManager, settings); transportService = mock(TransportService.class); - // channel = mock(ActionListener.class); - anomalyDetectionIndices = mock(ADIndexManagement.class); when(anomalyDetectionIndices.doesConfigIndexExist()).thenReturn(true); diff --git a/src/test/java/org/opensearch/ad/e2e/AbstractSyntheticDataTest.java b/src/test/java/org/opensearch/ad/AbstractADSyntheticDataTest.java similarity index 57% rename from src/test/java/org/opensearch/ad/e2e/AbstractSyntheticDataTest.java rename to src/test/java/org/opensearch/ad/AbstractADSyntheticDataTest.java index 5cfadcd1c..0adfb7c0f 100644 --- a/src/test/java/org/opensearch/ad/e2e/AbstractSyntheticDataTest.java +++ b/src/test/java/org/opensearch/ad/AbstractADSyntheticDataTest.java @@ -9,14 +9,13 @@ * GitHub history for details. */ -package org.opensearch.ad.e2e; +package org.opensearch.ad; import static org.opensearch.timeseries.TestHelpers.toHttpEntity; import java.io.File; import java.io.FileReader; import java.io.IOException; -import java.io.InputStreamReader; import java.nio.charset.Charset; import java.time.Instant; import java.util.ArrayList; @@ -29,16 +28,10 @@ import org.apache.hc.core5.http.message.BasicHeader; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.core.Logger; -import org.opensearch.ad.ODFERestTestCase; import org.opensearch.client.Request; -import org.opensearch.client.RequestOptions; -import org.opensearch.client.Response; import org.opensearch.client.RestClient; -import org.opensearch.client.WarningsHandler; -import org.opensearch.common.xcontent.json.JsonXContent; -import org.opensearch.core.xcontent.XContentBuilder; +import org.opensearch.timeseries.AbstractSyntheticDataTest; import org.opensearch.timeseries.TestHelpers; -import org.opensearch.timeseries.settings.TimeSeriesSettings; import com.google.common.collect.ImmutableList; import com.google.gson.JsonArray; @@ -46,62 +39,11 @@ import com.google.gson.JsonObject; import com.google.gson.JsonParser; -public class AbstractSyntheticDataTest extends ODFERestTestCase { - protected static final Logger LOG = (Logger) LogManager.getLogger(AbstractSyntheticDataTest.class); +public class AbstractADSyntheticDataTest extends AbstractSyntheticDataTest { + public static final Logger LOG = (Logger) LogManager.getLogger(AbstractADSyntheticDataTest.class); private static int batchSize = 1000; - /** - * In real time AD, we mute a node for a detector if that node keeps returning - * ResourceNotFoundException (5 times in a row). This is a problem for batch mode - * testing as we issue a large amount of requests quickly. Due to the speed, we - * won't be able to finish cold start before the ResourceNotFoundException mutes - * a node. Since our test case has only one node, there is no other nodes to fall - * back on. Here we disable such fault tolerance by setting max retries before - * muting to a large number and the actual wait time during muting to 0. - * - * @throws IOException when failing to create http request body - */ - protected void disableResourceNotFoundFaultTolerence() throws IOException { - XContentBuilder settingCommand = JsonXContent.contentBuilder(); - - settingCommand.startObject(); - settingCommand.startObject("persistent"); - settingCommand.field(TimeSeriesSettings.MAX_RETRY_FOR_UNRESPONSIVE_NODE.getKey(), 100_000); - settingCommand.field(TimeSeriesSettings.BACKOFF_MINUTES.getKey(), 0); - settingCommand.endObject(); - settingCommand.endObject(); - Request request = new Request("PUT", "/_cluster/settings"); - request.setJsonEntity(settingCommand.toString()); - - adminClient().performRequest(request); - } - - protected List getData(String datasetFileName) throws Exception { - JsonArray jsonArray = JsonParser - .parseReader(new FileReader(new File(getClass().getResource(datasetFileName).toURI()), Charset.defaultCharset())) - .getAsJsonArray(); - List list = new ArrayList<>(jsonArray.size()); - jsonArray.iterator().forEachRemaining(i -> list.add(i.getAsJsonObject())); - return list; - } - - protected JsonArray getHits(RestClient client, Request request) throws IOException { - Response response = client.performRequest(request); - return parseHits(response); - } - - protected JsonArray parseHits(Response response) throws IOException { - JsonObject json = JsonParser - .parseReader(new InputStreamReader(response.getEntity().getContent(), Charset.defaultCharset())) - .getAsJsonObject(); - JsonObject hits = json.getAsJsonObject("hits"); - if (hits == null) { - return null; - } - return hits.getAsJsonArray("hits"); - } - protected void runDetectionResult(String detectorId, Instant begin, Instant end, RestClient client, int entitySize) throws IOException, InterruptedException { // trigger run in current interval @@ -117,47 +59,48 @@ protected void runDetectionResult(String detectorId, Instant begin, Instant end, Thread.sleep(50 * entitySize); } - protected List getAnomalyResult(String detectorId, Instant end, int entitySize, RestClient client) { - try { - Request request = new Request("POST", "/_plugins/_anomaly_detection/detectors/results/_search"); - - String jsonTemplate = "{\n" - + " \"query\": {\n" - + " \"bool\": {\n" - + " \"filter\": [\n" - + " {\n" - + " \"term\": {\n" - + " \"detector_id\": \"%s\"\n" - + " }\n" - + " },\n" - + " {\n" - + " \"range\": {\n" - + " \"anomaly_grade\": {\n" - + " \"gte\": 0\n" - + " }\n" - + " }\n" - + " },\n" - + " {\n" - + " \"range\": {\n" - + " \"data_end_time\": {\n" - + " \"gte\": %d,\n" - + " \"lte\": %d\n" - + " }\n" - + " }\n" - + " }\n" - + " ]\n" - + " }\n" - + " }\n" - + "}"; - - long dateEndTime = end.toEpochMilli(); - String formattedJson = String.format(Locale.ROOT, jsonTemplate, detectorId, dateEndTime, dateEndTime); - request.setJsonEntity(formattedJson); - - // wait until results are available - // max wait for 60_000 milliseconds - int maxWaitCycles = 30; - do { + protected List getAnomalyResult(String detectorId, Instant end, int entitySize, RestClient client) + throws InterruptedException { + Request request = new Request("POST", "/_plugins/_anomaly_detection/detectors/results/_search"); + + String jsonTemplate = "{\n" + + " \"query\": {\n" + + " \"bool\": {\n" + + " \"filter\": [\n" + + " {\n" + + " \"term\": {\n" + + " \"detector_id\": \"%s\"\n" + + " }\n" + + " },\n" + + " {\n" + + " \"range\": {\n" + + " \"anomaly_grade\": {\n" + + " \"gte\": 0\n" + + " }\n" + + " }\n" + + " },\n" + + " {\n" + + " \"range\": {\n" + + " \"data_end_time\": {\n" + + " \"gte\": %d,\n" + + " \"lte\": %d\n" + + " }\n" + + " }\n" + + " }\n" + + " ]\n" + + " }\n" + + " }\n" + + "}"; + + long dateEndTime = end.toEpochMilli(); + String formattedJson = String.format(Locale.ROOT, jsonTemplate, detectorId, dateEndTime, dateEndTime); + request.setJsonEntity(formattedJson); + + // wait until results are available + // max wait for 60_000 milliseconds + int maxWaitCycles = 30; + do { + try { JsonArray hits = getHits(client, request); if (hits != null && hits.size() == entitySize) { assertTrue("empty response", hits != null); @@ -170,16 +113,34 @@ protected List getAnomalyResult(String detectorId, Instant end, int return res; } else { - LOG.info("wait for result, previous result: {}", hits); + LOG + .info( + "wait for result, previous result: {}, size: {}, eval result {}, expected {}", + hits, + hits.size(), + hits != null && hits.size() == entitySize, + entitySize + ); client.performRequest(new Request("POST", String.format(Locale.ROOT, "/%s/_refresh", ".opendistro-anomaly-results*"))); } Thread.sleep(2_000 * entitySize); - } while (maxWaitCycles-- >= 0); + } catch (Exception e) { + LOG.warn("Exception while waiting for result", e); + Thread.sleep(2_000 * entitySize); + } + } while (maxWaitCycles-- >= 0); - return new ArrayList<>(); + // leave some debug information before returning empty + try { + String matchAll = "{\n" + " \"size\": 1000,\n" + " \"query\": {\n" + " \"match_all\": {}\n" + " }\n" + "}"; + request.setJsonEntity(matchAll); + JsonArray hits = getHits(client, request); + LOG.info("match all result: {}", hits); } catch (Exception e) { - throw new RuntimeException(e); + LOG.warn("Exception while waiting for match all result", e); } + + return new ArrayList<>(); } protected double getAnomalyGrade(JsonObject source) { @@ -205,36 +166,6 @@ protected Instant getAnomalyTime(JsonObject source, Instant defaultVal) { return defaultVal; } - protected void createIndex(String datasetName, RestClient client, String mapping) throws IOException, InterruptedException { - Request request = new Request("PUT", datasetName); - request.setJsonEntity(mapping); - setWarningHandler(request, false); - client.performRequest(request); - Thread.sleep(1_000); - } - - protected void bulkIndexTrainData(String datasetName, List data, int trainTestSplit, RestClient client, String mapping) - throws Exception { - createIndex(datasetName, client, mapping); - - StringBuilder bulkRequestBuilder = new StringBuilder(); - for (int i = 0; i < trainTestSplit; i++) { - bulkRequestBuilder.append("{ \"index\" : { \"_index\" : \"" + datasetName + "\", \"_id\" : \"" + i + "\" } }\n"); - bulkRequestBuilder.append(data.get(i).toString()).append("\n"); - } - TestHelpers - .makeRequest( - client, - "POST", - "_bulk?refresh=true", - null, - toHttpEntity(bulkRequestBuilder.toString()), - ImmutableList.of(new BasicHeader(HttpHeaders.USER_AGENT, "Kibana")) - ); - Thread.sleep(1_000); - waitAllSyncheticDataIngested(trainTestSplit, datasetName, client); - } - protected String createDetector(RestClient client, String detectorJson) throws Exception { Request request = new Request("POST", "/_plugins/_anomaly_detection/detectors/"); @@ -245,50 +176,6 @@ protected String createDetector(RestClient client, String detectorJson) throws E return detectorId; } - @Override - protected void waitAllSyncheticDataIngested(int expectedSize, String datasetName, RestClient client) throws Exception { - int maxWaitCycles = 3; - do { - Request request = new Request("POST", String.format(Locale.ROOT, "/%s/_search", datasetName)); - request - .setJsonEntity( - String - .format( - Locale.ROOT, - "{\"query\": {" - + " \"match_all\": {}" - + " }," - + " \"size\": 1," - + " \"sort\": [" - + " {" - + " \"timestamp\": {" - + " \"order\": \"desc\"" - + " }" - + " }" - + " ]}" - ) - ); - // Make sure all of the test data has been ingested - JsonArray hits = getHits(client, request); - LOG.info("Latest synthetic data:" + hits); - if (hits != null - && hits.size() == 1 - && expectedSize - 1 == hits.get(0).getAsJsonObject().getAsJsonPrimitive("_id").getAsLong()) { - break; - } else { - request = new Request("POST", String.format(Locale.ROOT, "/%s/_refresh", datasetName)); - client.performRequest(request); - } - Thread.sleep(1_000); - } while (maxWaitCycles-- >= 0); - } - - protected void setWarningHandler(Request request, boolean strictDeprecationMode) { - RequestOptions.Builder options = RequestOptions.DEFAULT.toBuilder(); - options.setWarningsHandler(strictDeprecationMode ? WarningsHandler.STRICT : WarningsHandler.PERMISSIVE); - request.setOptions(options.build()); - } - protected void startDetector(String detectorId, RestClient client) throws Exception { Request request = new Request("POST", String.format(Locale.ROOT, "/_plugins/_anomaly_detection/detectors/%s/_start", detectorId)); @@ -359,7 +246,7 @@ protected void waitForInitDetector(String detectorId, RestClient client) throws * @param detectorId Detector Id * @param client OpenSearch Client * @param end date end time of the most recent detection period - * @param entitySize the number of entities + * @param entitySize the number of entity results to wait for * @throws Exception when failing to query/indexing from/to OpenSearch */ protected void simulateWaitForInitDetector(String detectorId, RestClient client, Instant end, int entitySize) throws Exception { @@ -381,16 +268,18 @@ protected void simulateWaitForInitDetector(String detectorId, RestClient client, assertTrue("time out while waiting for initing detector", duration <= 60_000); } - protected void bulkIndexData(List data, String datasetName, RestClient client, String mapping) throws Exception { + protected void bulkIndexData(List data, String datasetName, RestClient client, String mapping, int ingestDataSize) + throws Exception { createIndex(datasetName, client, mapping); StringBuilder bulkRequestBuilder = new StringBuilder(); LOG.info("data size {}", data.size()); int count = 0; - for (int i = 0; i < data.size(); i++) { + int pickedIngestSize = Math.min(ingestDataSize, data.size()); + for (int i = 0; i < pickedIngestSize; i++) { bulkRequestBuilder.append("{ \"index\" : { \"_index\" : \"" + datasetName + "\", \"_id\" : \"" + i + "\" } }\n"); bulkRequestBuilder.append(data.get(i).toString()).append("\n"); count++; - if (count >= batchSize || i == data.size() - 1) { + if (count >= batchSize || i == pickedIngestSize - 1) { count = 0; TestHelpers .makeRequest( @@ -433,4 +322,13 @@ protected int isAnomaly(Instant time, List> labels) { } return -1; } + + protected List getData(String datasetFileName) throws Exception { + JsonArray jsonArray = JsonParser + .parseReader(new FileReader(new File(getClass().getResource(datasetFileName).toURI()), Charset.defaultCharset())) + .getAsJsonArray(); + List list = new ArrayList<>(jsonArray.size()); + jsonArray.iterator().forEachRemaining(i -> list.add(i.getAsJsonObject())); + return list; + } } diff --git a/src/test/java/org/opensearch/ad/AnomalyDetectorJobRunnerTests.java b/src/test/java/org/opensearch/ad/AnomalyDetectorJobRunnerTests.java index 77ed1226e..fe9776f11 100644 --- a/src/test/java/org/opensearch/ad/AnomalyDetectorJobRunnerTests.java +++ b/src/test/java/org/opensearch/ad/AnomalyDetectorJobRunnerTests.java @@ -48,7 +48,9 @@ import org.mockito.Mock; import org.mockito.Mockito; import org.mockito.MockitoAnnotations; +import org.opensearch.ResourceAlreadyExistsException; import org.opensearch.action.ActionRequest; +import org.opensearch.action.admin.indices.create.CreateIndexResponse; import org.opensearch.action.get.GetRequest; import org.opensearch.action.get.GetResponse; import org.opensearch.action.index.IndexRequest; @@ -65,7 +67,12 @@ import org.opensearch.ad.task.ADTaskManager; import org.opensearch.ad.transport.AnomalyResultAction; import org.opensearch.ad.transport.AnomalyResultResponse; +import org.opensearch.client.AdminClient; import org.opensearch.client.Client; +import org.opensearch.client.IndicesAdminClient; +import org.opensearch.cluster.ClusterName; +import org.opensearch.cluster.ClusterState; +import org.opensearch.cluster.metadata.Metadata; import org.opensearch.cluster.service.ClusterService; import org.opensearch.common.settings.ClusterSettings; import org.opensearch.common.settings.Settings; @@ -97,6 +104,7 @@ import org.opensearch.timeseries.common.exception.EndRunException; import org.opensearch.timeseries.common.exception.TimeSeriesException; import org.opensearch.timeseries.constant.CommonName; +import org.opensearch.timeseries.function.ExecutorFunction; import org.opensearch.timeseries.model.FeatureData; import org.opensearch.timeseries.model.IntervalTimeConfiguration; import org.opensearch.timeseries.model.Job; @@ -163,6 +171,8 @@ public class AnomalyDetectorJobRunnerTests extends AbstractTimeSeriesTest { private ADIndexManagement anomalyDetectionIndices; + private Settings settings; + @BeforeClass public static void setUpBeforeClass() { setUpThreadPool(AnomalyDetectorJobRunnerTests.class.getSimpleName()); @@ -198,7 +208,7 @@ public void setup() throws Exception { adJobProcessor.setTaskManager(adTaskManager); - Settings settings = Settings + settings = Settings .builder() .put("plugins.anomaly_detection.max_retry_for_backoff", 2) .put("plugins.anomaly_detection.backoff_initial_delay", TimeValue.timeValueMillis(1)) @@ -861,4 +871,156 @@ public void testMarkResultIndexQueried() throws IOException { assertEquals(TimeSeriesSettings.NUM_MIN_SAMPLES, totalUpdates.getValue().longValue()); assertEquals(true, adTaskCacheManager.hasQueriedResultIndex(detector.getId())); } + + public void testValidateCustomResult() throws IOException { + String resultIndex = ADCommonName.CUSTOM_RESULT_INDEX_PREFIX + "index"; + when(jobParameter.getCustomResultIndexOrAlias()).thenReturn(resultIndex); + doAnswer(invocation -> { + ExecutorFunction function = invocation.getArgument(4); + function.execute(); + return null; + }).when(anomalyDetectionIndices).validateCustomIndexForBackendJob(any(), any(), any(), any(), any(ExecutorFunction.class), any()); + LockModel lock = new LockModel(CommonName.JOB_INDEX, jobParameter.getName(), Instant.now(), 10, false); + Instant executionStartTime = confirmInitializedSetup(); + + adJobProcessor.runJob(jobParameter, lockService, lock, Instant.now().minusSeconds(60), executionStartTime, recorder, detector); + verify(client, times(1)).execute(eq(AnomalyResultAction.INSTANCE), any(), any()); + } + + enum CreateIndexMode { + NOT_ACKED, + RESOUCE_EXISTS, + RUNTIME_EXCEPTION + } + + private void setUpValidateCustomResultIndex(CreateIndexMode mode) throws IOException { + String resultIndexAlias = ADCommonName.CUSTOM_RESULT_INDEX_PREFIX + "index"; + when(jobParameter.getCustomResultIndexOrAlias()).thenReturn(resultIndexAlias); + + ClusterName clusterName = new ClusterName("test"); + ClusterState clusterState = ClusterState.builder(clusterName).metadata(Metadata.builder().build()).build(); + when(clusterService.state()).thenReturn(clusterState); + ClusterSettings clusterSettings = new ClusterSettings( + Settings.EMPTY, + Collections + .unmodifiableSet( + new HashSet<>( + Arrays + .asList( + AnomalyDetectorSettings.AD_RESULT_HISTORY_MAX_DOCS_PER_SHARD, + AnomalyDetectorSettings.AD_RESULT_HISTORY_ROLLOVER_PERIOD, + AnomalyDetectorSettings.AD_RESULT_HISTORY_RETENTION_PERIOD, + AnomalyDetectorSettings.AD_MAX_PRIMARY_SHARDS + ) + ) + ) + ); + when(clusterService.getClusterSettings()).thenReturn(clusterSettings); + + AdminClient adminClient = mock(AdminClient.class); + IndicesAdminClient indicesAdminClient = mock(IndicesAdminClient.class); + doAnswer(invocation -> { + ActionListener listener = invocation.getArgument(1); + if (CreateIndexMode.NOT_ACKED == mode) { + // not acked + listener.onResponse(new CreateIndexResponse(false, false, resultIndexAlias)); + } else if (CreateIndexMode.RESOUCE_EXISTS == mode) { + listener.onFailure(new ResourceAlreadyExistsException("index created")); + } else { + listener.onFailure(new RuntimeException()); + } + + return null; + }).when(indicesAdminClient).create(any(), any(ActionListener.class)); + + when(client.admin()).thenReturn(adminClient); + when(adminClient.indices()).thenReturn(indicesAdminClient); + + anomalyDetectionIndices = new ADIndexManagement( + client, + clusterService, + threadPool, + settings, + nodeFilter, + 1, + NamedXContentRegistry.EMPTY + ); + adJobProcessor.setIndexManagement(anomalyDetectionIndices); + } + + public void testNotAckedValidCustomResultCreation() throws IOException { + setUpValidateCustomResultIndex(CreateIndexMode.NOT_ACKED); + LockModel lock = new LockModel(CommonName.JOB_INDEX, jobParameter.getName(), Instant.now(), 10, false); + Instant executionStartTime = confirmInitializedSetup(); + + assertEquals( + "Expect 0 EndRunException of " + + jobParameter.getName() + + ". Got " + + adJobProcessor.getEndRunExceptionCount(jobParameter.getName()).intValue(), + 0, + adJobProcessor.getEndRunExceptionCount(jobParameter.getName()).intValue() + ); + adJobProcessor.runJob(jobParameter, lockService, lock, Instant.now().minusSeconds(60), executionStartTime, recorder, detector); + assertEquals( + "Expect 1 EndRunException of " + + jobParameter.getName() + + ". Got " + + adJobProcessor.getEndRunExceptionCount(jobParameter.getName()).intValue(), + 1, + adJobProcessor.getEndRunExceptionCount(jobParameter.getName()).intValue() + ); + } + + public void testCustomResultExistsWhileCreation() throws IOException { + setUpValidateCustomResultIndex(CreateIndexMode.RESOUCE_EXISTS); + LockModel lock = new LockModel(CommonName.JOB_INDEX, jobParameter.getName(), Instant.now(), 10, false); + Instant executionStartTime = confirmInitializedSetup(); + + assertEquals( + "Expect 0 EndRunException of " + + jobParameter.getName() + + ". Got " + + adJobProcessor.getEndRunExceptionCount(jobParameter.getName()).intValue(), + 0, + adJobProcessor.getEndRunExceptionCount(jobParameter.getName()).intValue() + ); + adJobProcessor.runJob(jobParameter, lockService, lock, Instant.now().minusSeconds(60), executionStartTime, recorder, detector); + assertEquals( + "Expect 0 EndRunException of " + + jobParameter.getName() + + ". Got " + + adJobProcessor.getEndRunExceptionCount(jobParameter.getName()).intValue(), + 0, + adJobProcessor.getEndRunExceptionCount(jobParameter.getName()).intValue() + ); + verify(client, times(1)).index(any(), any()); + verify(client, times(1)).delete(any(), any()); + } + + public void testUnexpectedWhileCreation() throws IOException { + setUpValidateCustomResultIndex(CreateIndexMode.RUNTIME_EXCEPTION); + LockModel lock = new LockModel(CommonName.JOB_INDEX, jobParameter.getName(), Instant.now(), 10, false); + Instant executionStartTime = confirmInitializedSetup(); + + assertEquals( + "Expect 0 EndRunException of " + + jobParameter.getName() + + ". Got " + + adJobProcessor.getEndRunExceptionCount(jobParameter.getName()).intValue(), + 0, + adJobProcessor.getEndRunExceptionCount(jobParameter.getName()).intValue() + ); + adJobProcessor.runJob(jobParameter, lockService, lock, Instant.now().minusSeconds(60), executionStartTime, recorder, detector); + assertEquals( + "Expect 1 EndRunException of " + + jobParameter.getName() + + ". Got " + + adJobProcessor.getEndRunExceptionCount(jobParameter.getName()).intValue(), + 1, + adJobProcessor.getEndRunExceptionCount(jobParameter.getName()).intValue() + ); + verify(client, times(0)).index(any(), any()); + verify(client, times(0)).delete(any(), any()); + } } diff --git a/src/test/java/org/opensearch/ad/AnomalyDetectorRestTestCase.java b/src/test/java/org/opensearch/ad/AnomalyDetectorRestTestCase.java index ef2047466..dc0ea5b4f 100644 --- a/src/test/java/org/opensearch/ad/AnomalyDetectorRestTestCase.java +++ b/src/test/java/org/opensearch/ad/AnomalyDetectorRestTestCase.java @@ -15,44 +15,32 @@ import java.io.IOException; import java.io.InputStream; -import java.nio.file.Files; -import java.nio.file.Path; import java.time.Instant; import java.util.ArrayList; import java.util.Collections; import java.util.Locale; import java.util.Map; -import javax.management.MBeanServerInvocationHandler; -import javax.management.MalformedObjectNameException; -import javax.management.ObjectName; -import javax.management.remote.JMXConnector; -import javax.management.remote.JMXConnectorFactory; -import javax.management.remote.JMXServiceURL; - import org.apache.hc.core5.http.HttpHeaders; import org.apache.hc.core5.http.message.BasicHeader; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.core.Logger; -import org.junit.AfterClass; import org.opensearch.ad.model.ADTask; import org.opensearch.ad.model.AnomalyDetector; import org.opensearch.ad.model.AnomalyDetectorExecutionInput; -import org.opensearch.client.Request; import org.opensearch.client.Response; import org.opensearch.client.RestClient; import org.opensearch.common.settings.Settings; import org.opensearch.common.xcontent.LoggingDeprecationHandler; -import org.opensearch.common.xcontent.XContentFactory; import org.opensearch.common.xcontent.XContentType; import org.opensearch.core.rest.RestStatus; import org.opensearch.core.xcontent.NamedXContentRegistry; import org.opensearch.core.xcontent.ToXContentObject; import org.opensearch.core.xcontent.XContent; -import org.opensearch.core.xcontent.XContentBuilder; import org.opensearch.core.xcontent.XContentParser; import org.opensearch.core.xcontent.XContentParserUtils; import org.opensearch.test.rest.OpenSearchRestTestCase; +import org.opensearch.timeseries.ODFERestTestCase; import org.opensearch.timeseries.TestHelpers; import org.opensearch.timeseries.model.DateRange; import org.opensearch.timeseries.model.Job; @@ -63,7 +51,7 @@ import com.google.gson.JsonArray; public abstract class AnomalyDetectorRestTestCase extends ODFERestTestCase { - private static final Logger LOG = (Logger) LogManager.getLogger(AnomalyDetectorRestTestCase.class); + public static final Logger LOG = (Logger) LogManager.getLogger(AnomalyDetectorRestTestCase.class); public static final int MAX_RETRY_TIMES = 10; @@ -337,21 +325,6 @@ protected final XContentParser createAdParser(XContent xContent, InputStream dat return xContent.createParser(TestHelpers.xContentRegistry(), LoggingDeprecationHandler.INSTANCE, data); } - public void updateClusterSettings(String settingKey, Object value) throws Exception { - XContentBuilder builder = XContentFactory - .jsonBuilder() - .startObject() - .startObject("persistent") - .field(settingKey, value) - .endObject() - .endObject(); - Request request = new Request("PUT", "_cluster/settings"); - request.setJsonEntity(builder.toString()); - Response response = client().performRequest(request); - assertEquals(RestStatus.OK, RestStatus.fromCode(response.getStatusLine().getStatusCode())); - Thread.sleep(2000); // sleep some time to resolve flaky test - } - public Response getDetectorProfile(String detectorId, boolean all, String customizedProfile, RestClient client) throws IOException { return TestHelpers .makeRequest( @@ -683,47 +656,4 @@ protected Response validateAnomalyDetector(AnomalyDetector detector, RestClient null ); } - - /** - * We need to be able to dump the jacoco coverage before cluster is shut down. - * The new internal testing framework removed some of the gradle tasks we were listening to - * to choose a good time to do it. This will dump the executionData to file after each test. - * TODO: This is also currently just overwriting integTest.exec with the updated execData without - * resetting after writing each time. This can be improved to either write an exec file per test - * or by letting jacoco append to the file - */ - public interface IProxy { - byte[] getExecutionData(boolean reset); - - void dump(boolean reset); - - void reset(); - } - - @AfterClass - public static void dumpCoverage() throws IOException, MalformedObjectNameException { - // jacoco.dir is set in esplugin-coverage.gradle, if it doesn't exist we don't - // want to collect coverage so we can return early - String jacocoBuildPath = System.getProperty("jacoco.dir"); - if (org.opensearch.core.common.Strings.isNullOrEmpty(jacocoBuildPath)) { - return; - } - - String serverUrl = System.getProperty("jmx.serviceUrl"); - if (serverUrl == null) { - LOG.error("Failed to dump coverage because JMX Service URL is null"); - throw new IllegalArgumentException("JMX Service URL is null"); - } - - try (JMXConnector connector = JMXConnectorFactory.connect(new JMXServiceURL(serverUrl))) { - IProxy proxy = MBeanServerInvocationHandler - .newProxyInstance(connector.getMBeanServerConnection(), new ObjectName("org.jacoco:type=Runtime"), IProxy.class, false); - - Path path = Path.of(Path.of(jacocoBuildPath, "integTest.exec").toFile().getCanonicalPath()); - Files.write(path, proxy.getExecutionData(false)); - } catch (Exception ex) { - LOG.error("Failed to dump coverage: ", ex); - throw new RuntimeException("Failed to dump coverage: " + ex); - } - } } diff --git a/src/test/java/org/opensearch/ad/HistoricalAnalysisRestTestCase.java b/src/test/java/org/opensearch/ad/HistoricalAnalysisRestTestCase.java index 588598400..d71b3370b 100644 --- a/src/test/java/org/opensearch/ad/HistoricalAnalysisRestTestCase.java +++ b/src/test/java/org/opensearch/ad/HistoricalAnalysisRestTestCase.java @@ -28,7 +28,6 @@ import org.apache.hc.core5.http.message.BasicHeader; import org.junit.Before; import org.opensearch.ad.mock.model.MockSimpleLog; -import org.opensearch.ad.model.ADTask; import org.opensearch.ad.model.ADTaskProfile; import org.opensearch.ad.model.AnomalyDetector; import org.opensearch.client.Response; @@ -37,7 +36,6 @@ import org.opensearch.core.xcontent.ToXContentObject; import org.opensearch.core.xcontent.XContentParser; import org.opensearch.search.aggregations.AggregationBuilder; -import org.opensearch.timeseries.TaskProfile; import org.opensearch.timeseries.TestHelpers; import org.opensearch.timeseries.model.DateRange; import org.opensearch.timeseries.model.Feature; @@ -253,7 +251,7 @@ protected List waitUntilTaskDone(String detectorId) throws InterruptedEx protected List waitUntilTaskReachState(String detectorId, Set targetStates) throws InterruptedException { List results = new ArrayList<>(); int i = 0; - TaskProfile adTaskProfile = null; + ADTaskProfile adTaskProfile = null; // Increase retryTimes if some task can't reach done state while ((adTaskProfile == null || !targetStates.contains(adTaskProfile.getTask().getState())) && i < MAX_RETRY_TIMES) { try { @@ -270,4 +268,25 @@ protected List waitUntilTaskReachState(String detectorId, Set ta results.add(i); return results; } + + protected List waitUntilEntityCountAvailable(String detectorId) throws InterruptedException { + List results = new ArrayList<>(); + int i = 0; + ADTaskProfile adTaskProfile = null; + // Increase retryTimes if some task can't reach done state + while ((adTaskProfile == null || adTaskProfile.getTotalEntitiesCount() == null) && i < MAX_RETRY_TIMES) { + try { + adTaskProfile = getADTaskProfile(detectorId); + } catch (Exception e) { + logger.error("failed to get ADTaskProfile", e); + } finally { + Thread.sleep(1000); + } + i++; + } + assertNotNull(adTaskProfile); + results.add(adTaskProfile); + results.add(i); + return results; + } } diff --git a/src/test/java/org/opensearch/ad/caching/PriorityCacheTests.java b/src/test/java/org/opensearch/ad/caching/PriorityCacheTests.java index 1e26f4ac6..7c09709f0 100644 --- a/src/test/java/org/opensearch/ad/caching/PriorityCacheTests.java +++ b/src/test/java/org/opensearch/ad/caching/PriorityCacheTests.java @@ -33,6 +33,7 @@ import java.util.Arrays; import java.util.Collection; import java.util.Collections; +import java.util.Deque; import java.util.HashSet; import java.util.List; import java.util.Optional; @@ -45,6 +46,7 @@ import org.apache.logging.log4j.Logger; import org.junit.Before; import org.mockito.ArgumentCaptor; +import org.mockito.Mockito; import org.opensearch.ad.ml.ADCheckpointDao; import org.opensearch.ad.ml.ADModelManager; import org.opensearch.ad.model.AnomalyDetector; @@ -750,4 +752,40 @@ public void testRemoveEntityModel() { verify(checkpoint, times(1)).deleteModelCheckpoint(eq(entity2.getModelId(detectorId).get()), any()); verify(checkpointWriteQueue, never()).write(any(), anyBoolean(), any()); } + + public void testGetTotalUpdates_orElseGetBranch() { + // Mock the ModelState and its methods + ModelState mockModelState = Mockito.mock(ModelState.class); + + // Mock the getModel method to return an empty Optional + when(mockModelState.getModel()).thenReturn(Optional.empty()); + + // Mock the getSamples method to return a list with a specific size + Deque mockSamples = Mockito.mock(Deque.class); + when(mockSamples.size()).thenReturn(5); + when(mockModelState.getSamples()).thenReturn(mockSamples); + + // Call the method under test + long result = entityCache.getTotalUpdates(mockModelState); + + // Assert that the result is the size of the samples + assertEquals(5L, result); + } + + public void testGetTotalUpdates_orElseGetBranchWithNullSamples() { + // Mock the ModelState and its methods + ModelState mockModelState = Mockito.mock(ModelState.class); + + // Mock the getModel method to return an empty Optional + when(mockModelState.getModel()).thenReturn(Optional.empty()); + + // Mock the getSamples method to return null + when(mockModelState.getSamples()).thenReturn(null); + + // Call the method under test + long result = entityCache.getTotalUpdates(mockModelState); + + // Assert that the result is 0L + assertEquals(0L, result); + } } diff --git a/src/test/java/org/opensearch/ad/e2e/AbstractRuleTestCase.java b/src/test/java/org/opensearch/ad/e2e/AbstractRuleTestCase.java new file mode 100644 index 000000000..a5d6c8686 --- /dev/null +++ b/src/test/java/org/opensearch/ad/e2e/AbstractRuleTestCase.java @@ -0,0 +1,215 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.ad.e2e; + +import java.time.Duration; +import java.time.Instant; +import java.time.temporal.ChronoUnit; +import java.util.List; +import java.util.Locale; +import java.util.TreeMap; + +import org.opensearch.ad.AbstractADSyntheticDataTest; +import org.opensearch.client.RestClient; + +import com.google.gson.JsonObject; + +public class AbstractRuleTestCase extends AbstractADSyntheticDataTest { + + protected static class TrainResult { + String detectorId; + List data; + // actual index of training data. As we have multiple entities, + // trainTestSplit means how many groups of entities are used for training. + // rawDataTrainTestSplit is the actual index of training data. + int rawDataTrainTestSplit; + Duration windowDelay; + + public TrainResult(String detectorId, List data, int rawDataTrainTestSplit, Duration windowDelay) { + this.detectorId = detectorId; + this.data = data; + this.rawDataTrainTestSplit = rawDataTrainTestSplit; + this.windowDelay = windowDelay; + } + } + + /** + * Ingest all of the data in file datasetName + * + * @param datasetName data set file name + * @param intervalMinutes detector interval + * @param numberOfEntities number of entities in the file + * @param trainTestSplit used to calculate train start time + * @param useDateNanos whether to use nano date type in detector timestamp + * @return TrainResult for the following method calls + * @throws Exception failing to ingest data + */ + protected TrainResult ingestTrainData( + String datasetName, + int intervalMinutes, + int numberOfEntities, + int trainTestSplit, + boolean useDateNanos + ) throws Exception { + return ingestTrainData(datasetName, intervalMinutes, numberOfEntities, trainTestSplit, useDateNanos, -1); + } + + protected TrainResult ingestTrainData( + String datasetName, + int intervalMinutes, + int numberOfEntities, + int trainTestSplit, + boolean useDateNanos, + int ingestDataSize + ) throws Exception { + String dataFileName = String.format(Locale.ROOT, "data/%s.data", datasetName); + + List data = getData(dataFileName); + + RestClient client = client(); + String categoricalField = "componentName"; + String mapping = String + .format( + Locale.ROOT, + "{ \"mappings\": { \"properties\": { \"timestamp\": { \"type\":" + + (useDateNanos ? "\"date_nanos\"" : "\"date\"") + + "}," + + " \"transform._doc_count\": { \"type\": \"integer\" }," + + "\"%s\": { \"type\": \"keyword\"} } } }", + categoricalField + ); + + if (ingestDataSize <= 0) { + bulkIndexData(data, datasetName, client, mapping, data.size()); + } else { + bulkIndexData(data, datasetName, client, mapping, ingestDataSize); + } + + // we need to account that interval can have multiple entity record + int rawDataTrainTestSplit = trainTestSplit * numberOfEntities; + String trainTimeStr = data.get(rawDataTrainTestSplit - 1).get("timestamp").getAsString(); + Instant trainTime = Instant.ofEpochMilli(Long.parseLong(trainTimeStr)); + /* + * The {@code CompositeRetriever.PageIterator.hasNext()} method checks if a request is expired + * relative to the current system time. This method is designed to ensure that the execution time + * is set to either the current time or a future time to prevent premature expirations in our tests. + * + * Also, AD accepts windowDelay in the unit of minutes. Thus, we need to convert the delay in minutes. This will + * make it easier to search for results based on data end time. Otherwise, real data time and the converted + * data time from request time. + * Assume x = real data time. y= real window delay. y'= window delay in minutes. If y and y' are different, + * x + y - y' != x. + */ + long windowDelayMinutes = Duration.between(trainTime, Instant.now()).toMinutes(); + Duration windowDelay = Duration.ofMinutes(windowDelayMinutes); + + String detector = String + .format( + Locale.ROOT, + "{ \"name\": \"test\", \"description\": \"test\", \"time_field\": \"timestamp\"" + + ", \"indices\": [\"%s\"], \"feature_attributes\": [{ \"feature_name\": \"feature 1\", \"feature_enabled\": " + + "\"true\", \"aggregation_query\": { \"Feature1\": { \"sum\": { \"field\": \"transform._doc_count\" } } } }" + + "], \"detection_interval\": { \"period\": { \"interval\": %d, \"unit\": \"Minutes\" } }, " + + "\"category_field\": [\"%s\"], " + + "\"window_delay\": { \"period\": {\"interval\": %d, \"unit\": \"MINUTES\"}}," + + "\"history\": %d," + + "\"schema_version\": 0," + + "\"rules\": [{\"action\": \"ignore_anomaly\", \"conditions\": [{\"feature_name\": \"feature 1\", \"threshold_type\": \"actual_over_expected_ratio\", \"operator\": \"lte\", \"value\": 0.3}, " + + "{\"feature_name\": \"feature 1\", \"threshold_type\": \"expected_over_actual_ratio\", \"operator\": \"lte\", \"value\": 0.3}" + + "]}]" + + "}", + datasetName, + intervalMinutes, + categoricalField, + windowDelayMinutes, + trainTestSplit - 1 + ); + String detectorId = createDetector(client, detector); + LOG.info("Created detector {}", detectorId); + + Instant executeBegin = dataToExecutionTime(trainTime, windowDelay); + Instant executeEnd = executeBegin.plus(intervalMinutes, ChronoUnit.MINUTES); + Instant dataEnd = trainTime.plus(intervalMinutes, ChronoUnit.MINUTES); + + LOG.info("start detector {}", detectorId); + simulateStartDetector(detectorId, executeBegin, executeEnd, client, numberOfEntities); + int resultsToWait = findTrainTimeEntities(rawDataTrainTestSplit - 1, data); + LOG.info("wait for initting detector {}. {} results are expected.", detectorId, resultsToWait); + simulateWaitForInitDetector(detectorId, client, dataEnd, resultsToWait); + + return new TrainResult(detectorId, data, rawDataTrainTestSplit, windowDelay); + } + + /** + * Assume the data is sorted in time. The method look up and below startIndex + * and return how many timestamps equal to timestampStr. + * @param startIndex where to start look for timestamp + * @return how many timestamps equal to timestampStr + */ + protected int findTrainTimeEntities(int startIndex, List data) { + String timestampStr = data.get(startIndex).get("timestamp").getAsString(); + int count = 1; + for (int i = startIndex - 1; i >= 0; i--) { + String trainTimeStr = data.get(i).get("timestamp").getAsString(); + if (trainTimeStr.equals(timestampStr)) { + count++; + } else { + break; + } + } + for (int i = startIndex + 1; i < data.size(); i++) { + String trainTimeStr = data.get(i).get("timestamp").getAsString(); + if (trainTimeStr.equals(timestampStr)) { + count++; + } else { + break; + } + } + return count; + } + + protected Instant dataToExecutionTime(Instant instant, Duration windowDelay) { + return instant.plus(windowDelay); + } + + /** + * + * @param testData current data to score + * @param entityMap a map to record the number of times we have seen a timestamp. Used to detect missing values. + * @param windowDelay ingestion delay + * @param intervalMinutes detector interval + * @param detectorId detector Id + * @param client RestFul client + * @param numberOfEntities the number of entities. + * @return whether we erred out. + */ + protected boolean scoreOneResult( + JsonObject testData, + TreeMap entityMap, + Duration windowDelay, + int intervalMinutes, + String detectorId, + RestClient client, + int numberOfEntities + ) { + String beginTimeStampAsString = testData.get("timestamp").getAsString(); + Integer newCount = entityMap.compute(beginTimeStampAsString, (key, oldValue) -> (oldValue == null) ? 1 : oldValue + 1); + if (newCount > 1) { + // we have seen this timestamp before. Without this line, we will get rcf IllegalArgumentException about out of order tuples + return false; + } + Instant begin = dataToExecutionTime(Instant.ofEpochMilli(Long.parseLong(beginTimeStampAsString)), windowDelay); + Instant end = begin.plus(intervalMinutes, ChronoUnit.MINUTES); + try { + runDetectionResult(detectorId, begin, end, client, numberOfEntities); + } catch (Exception e) { + LOG.error("failed to run detection result", e); + return true; + } + return false; + } + +} diff --git a/src/test/java/org/opensearch/ad/e2e/DetectionResultEvalutationIT.java b/src/test/java/org/opensearch/ad/e2e/DetectionResultEvalutationIT.java index fd1aa5cc9..09aa9d418 100644 --- a/src/test/java/org/opensearch/ad/e2e/DetectionResultEvalutationIT.java +++ b/src/test/java/org/opensearch/ad/e2e/DetectionResultEvalutationIT.java @@ -27,6 +27,7 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.core.Logger; +import org.opensearch.ad.AbstractADSyntheticDataTest; import org.opensearch.client.Request; import org.opensearch.client.Response; import org.opensearch.client.RestClient; @@ -39,7 +40,7 @@ import com.google.gson.JsonObject; import com.google.gson.JsonPrimitive; -public class DetectionResultEvalutationIT extends AbstractSyntheticDataTest { +public class DetectionResultEvalutationIT extends AbstractADSyntheticDataTest { protected static final Logger LOG = (Logger) LogManager.getLogger(DetectionResultEvalutationIT.class); public void testValidationIntervalRecommendation() throws Exception { diff --git a/src/test/java/org/opensearch/ad/e2e/RuleIT.java b/src/test/java/org/opensearch/ad/e2e/RuleIT.java new file mode 100644 index 000000000..92f733f01 --- /dev/null +++ b/src/test/java/org/opensearch/ad/e2e/RuleIT.java @@ -0,0 +1,92 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.ad.e2e; + +import java.time.Instant; +import java.time.temporal.ChronoUnit; +import java.util.List; +import java.util.Locale; +import java.util.Map; +import java.util.TreeMap; + +import com.google.gson.JsonObject; + +public class RuleIT extends AbstractRuleTestCase { + public void testRuleWithDateNanos() throws Exception { + // TODO: this test case will run for a much longer time and timeout with security enabled + if (!isHttps()) { + disableResourceNotFoundFaultTolerence(); + + String datasetName = "rule"; + int intervalMinutes = 10; + int numberOfEntities = 2; + int trainTestSplit = 100; + + TrainResult trainResult = ingestTrainData( + datasetName, + intervalMinutes, + numberOfEntities, + trainTestSplit, + true, + // ingest just enough for finish the test + (trainTestSplit + 1) * numberOfEntities + ); + + List data = trainResult.data; + LOG.info("scoring data at {}", data.get(trainResult.rawDataTrainTestSplit).get("timestamp").getAsString()); + + // one run call will evaluate all entities within an interval + int numberEntitiesScored = findTrainTimeEntities(trainResult.rawDataTrainTestSplit, data); + // an entity might have missing values (e.g., at timestamp 1694713200000). + // Use a map to record the number of times we have seen them. + // data start time -> the number of entities + TreeMap entityMap = new TreeMap<>(); + // rawDataTrainTestSplit is the actual index of next test data. + assertFalse( + scoreOneResult( + data.get(trainResult.rawDataTrainTestSplit), + entityMap, + trainResult.windowDelay, + intervalMinutes, + trainResult.detectorId, + client(), + numberEntitiesScored + ) + ); + + assertEquals("expected 1 timestamp but got " + entityMap.size(), 1, entityMap.size()); + + for (Map.Entry entry : entityMap.entrySet()) { + String beginTimeStampAsString = entry.getKey(); + Instant begin = Instant.ofEpochMilli(Long.parseLong(beginTimeStampAsString)); + Instant end = begin.plus(intervalMinutes, ChronoUnit.MINUTES); + try { + List sourceList = getAnomalyResult(trainResult.detectorId, end, numberEntitiesScored, client()); + + assertTrue( + String + .format( + Locale.ROOT, + "the number of results is %d at %s, expected %d ", + sourceList.size(), + beginTimeStampAsString, + numberEntitiesScored + ), + sourceList.size() == numberEntitiesScored + ); + for (int j = 0; j < numberEntitiesScored; j++) { + JsonObject source = sourceList.get(j); + double anomalyGrade = getAnomalyGrade(source); + assertTrue("anomalyGrade cannot be negative", anomalyGrade >= 0); + } + } catch (Exception e) { + LOG.error("failed to get detection results", e); + assertTrue(false); + } + } + } + } +} diff --git a/src/test/java/org/opensearch/ad/e2e/RuleModelPerfIT.java b/src/test/java/org/opensearch/ad/e2e/RuleModelPerfIT.java index b8e8cff90..208cfbe48 100644 --- a/src/test/java/org/opensearch/ad/e2e/RuleModelPerfIT.java +++ b/src/test/java/org/opensearch/ad/e2e/RuleModelPerfIT.java @@ -32,8 +32,8 @@ import com.google.gson.JsonObject; import com.google.gson.JsonParser; -public class RuleModelPerfIT extends AbstractSyntheticDataTest { - private static final Logger LOG = (Logger) LogManager.getLogger(RuleModelPerfIT.class); +public class RuleModelPerfIT extends AbstractRuleTestCase { + static final Logger LOG = (Logger) LogManager.getLogger(RuleModelPerfIT.class); public void testRule() throws Exception { // TODO: this test case will run for a much longer time and timeout with security enabled @@ -50,21 +50,6 @@ public void testRule() throws Exception { } } - public void testRuleWithDateNanos() throws Exception { - // TODO: this test case will run for a much longer time and timeout with security enabled - if (!isHttps()) { - disableResourceNotFoundFaultTolerence(); - // there are 8 entities in the data set. Each one needs 1500 rows as training data. - Map minPrecision = new HashMap<>(); - minPrecision.put("Phoenix", 0.5); - minPrecision.put("Scottsdale", 0.5); - Map minRecall = new HashMap<>(); - minRecall.put("Phoenix", 0.9); - minRecall.put("Scottsdale", 0.6); - verifyRule("rule", 10, minPrecision.size(), 1500, minPrecision, minRecall, 20, true); - } - } - private void verifyTestResults( Triple, Integer, Map>> testResults, Map>> anomalies, @@ -144,96 +129,29 @@ public void verifyRule( int maxError, boolean useDateNanos ) throws Exception { - String dataFileName = String.format(Locale.ROOT, "data/%s.data", datasetName); - String labelFileName = String.format(Locale.ROOT, "data/%s.label", datasetName); - List data = getData(dataFileName); + String labelFileName = String.format(Locale.ROOT, "data/%s.label", datasetName); Map>> anomalies = getAnomalyWindowsMap(labelFileName); - RestClient client = client(); - String categoricalField = "componentName"; - String mapping = String - .format( - Locale.ROOT, - "{ \"mappings\": { \"properties\": { \"timestamp\": { \"type\":" - + (useDateNanos ? "\"date_nanos\"" : "\"date\"") - + "}," - + " \"transform._doc_count\": { \"type\": \"integer\" }," - + "\"%s\": { \"type\": \"keyword\"} } } }", - categoricalField - ); - - bulkIndexData(data, datasetName, client, mapping); - - // we need to account that interval can have multiple entity record - int rawDataTrainTestSplit = trainTestSplit * numberOfEntities; - Instant trainTime = Instant.ofEpochMilli(Long.parseLong(data.get(rawDataTrainTestSplit - 1).get("timestamp").getAsString())); - /* - * The {@code CompositeRetriever.PageIterator.hasNext()} method checks if a request is expired - * relative to the current system time. This method is designed to ensure that the execution time - * is set to either the current time or a future time to prevent premature expirations in our tests. - * - * Also, AD accepts windowDelay in the unit of minutes. Thus, we need to convert the delay in minutes. This will - * make it easier to search for results based on data end time. Otherwise, real data time and the converted - * data time from request time. - * Assume x = real data time. y= real window delay. y'= window delay in minutes. If y and y' are different, - * x + y - y' != x. - */ - long windowDelayMinutes = Duration.between(trainTime, Instant.now()).toMinutes(); - Duration windowDelay = Duration.ofMinutes(windowDelayMinutes); - - String detector = String - .format( - Locale.ROOT, - "{ \"name\": \"test\", \"description\": \"test\", \"time_field\": \"timestamp\"" - + ", \"indices\": [\"%s\"], \"feature_attributes\": [{ \"feature_name\": \"feature 1\", \"feature_enabled\": " - + "\"true\", \"aggregation_query\": { \"Feature1\": { \"sum\": { \"field\": \"transform._doc_count\" } } } }" - + "], \"detection_interval\": { \"period\": { \"interval\": %d, \"unit\": \"Minutes\" } }, " - + "\"category_field\": [\"%s\"], " - + "\"window_delay\": { \"period\": {\"interval\": %d, \"unit\": \"MINUTES\"}}," - + "\"history\": %d," - + "\"schema_version\": 0," - + "\"rules\": [{\"action\": \"ignore_anomaly\", \"conditions\": [{\"feature_name\": \"feature 1\", \"threshold_type\": \"actual_over_expected_ratio\", \"operator\": \"lte\", \"value\": 0.3}, " - + "{\"feature_name\": \"feature 1\", \"threshold_type\": \"expected_over_actual_ratio\", \"operator\": \"lte\", \"value\": 0.3}" - + "]}]" - + "}", - datasetName, - intervalMinutes, - categoricalField, - windowDelayMinutes, - trainTestSplit - 1 - ); - String detectorId = createDetector(client, detector); - LOG.info("Created detector {}", detectorId); - - Instant executeBegin = dataToExecutionTime(trainTime, windowDelay); - Instant executeEnd = executeBegin.plus(intervalMinutes, ChronoUnit.MINUTES); - Instant dataEnd = trainTime.plus(intervalMinutes, ChronoUnit.MINUTES); - - simulateStartDetector(detectorId, executeBegin, executeEnd, client, numberOfEntities); - simulateWaitForInitDetector(detectorId, client, dataEnd, numberOfEntities); + TrainResult trainResult = ingestTrainData(datasetName, intervalMinutes, numberOfEntities, trainTestSplit, useDateNanos); Triple, Integer, Map>> results = getTestResults( - detectorId, - data, - rawDataTrainTestSplit, + trainResult.detectorId, + trainResult.data, + trainResult.rawDataTrainTestSplit, intervalMinutes, anomalies, - client, + client(), numberOfEntities, - windowDelay + trainResult.windowDelay ); verifyTestResults(results, anomalies, minPrecision, minRecall, maxError); } - private Instant dataToExecutionTime(Instant instant, Duration windowDelay) { - return instant.plus(windowDelay); - } - private Triple, Integer, Map>> getTestResults( String detectorId, List data, - int trainTestSplit, + int rawTrainTestSplit, int intervalMinutes, Map>> anomalies, RestClient client, @@ -247,20 +165,9 @@ private Triple, Integer, Map>> getTes // Use a map to record the number of times we have seen them. // data start time -> the number of entities TreeMap entityMap = new TreeMap<>(); - for (int i = trainTestSplit; i < data.size(); i++) { - String beginTimeStampAsString = data.get(i).get("timestamp").getAsString(); - Integer newCount = entityMap.compute(beginTimeStampAsString, (key, oldValue) -> (oldValue == null) ? 1 : oldValue + 1); - if (newCount > 1) { - // we have seen this timestamp before. Without this line, we will get rcf IllegalArgumentException about out of order tuples - continue; - } - Instant begin = dataToExecutionTime(Instant.ofEpochMilli(Long.parseLong(beginTimeStampAsString)), windowDelay); - Instant end = begin.plus(intervalMinutes, ChronoUnit.MINUTES); - try { - runDetectionResult(detectorId, begin, end, client, numberOfEntities); - } catch (Exception e) { + for (int i = rawTrainTestSplit; i < data.size(); i++) { + if (scoreOneResult(data.get(i), entityMap, windowDelay, intervalMinutes, detectorId, client, numberOfEntities)) { errors++; - LOG.error("failed to run detection result", e); } } diff --git a/src/test/java/org/opensearch/ad/e2e/SingleStreamModelPerfIT.java b/src/test/java/org/opensearch/ad/e2e/SingleStreamModelPerfIT.java index 873e033f8..75855a1a8 100644 --- a/src/test/java/org/opensearch/ad/e2e/SingleStreamModelPerfIT.java +++ b/src/test/java/org/opensearch/ad/e2e/SingleStreamModelPerfIT.java @@ -31,6 +31,7 @@ import org.apache.hc.core5.http.message.BasicHeader; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.core.Logger; +import org.opensearch.ad.AbstractADSyntheticDataTest; import org.opensearch.client.RestClient; import org.opensearch.timeseries.TestHelpers; @@ -39,7 +40,7 @@ import com.google.gson.JsonObject; import com.google.gson.JsonParser; -public class SingleStreamModelPerfIT extends AbstractSyntheticDataTest { +public class SingleStreamModelPerfIT extends AbstractADSyntheticDataTest { protected static final Logger LOG = (Logger) LogManager.getLogger(SingleStreamModelPerfIT.class); public void testDataset() throws Exception { diff --git a/src/test/java/org/opensearch/ad/rest/HistoricalAnalysisRestApiIT.java b/src/test/java/org/opensearch/ad/rest/HistoricalAnalysisRestApiIT.java index 2c6ebd6b3..0e1078968 100644 --- a/src/test/java/org/opensearch/ad/rest/HistoricalAnalysisRestApiIT.java +++ b/src/test/java/org/opensearch/ad/rest/HistoricalAnalysisRestApiIT.java @@ -11,6 +11,7 @@ package org.opensearch.ad.rest; +import static org.opensearch.ad.settings.AnomalyDetectorSettings.AD_MODEL_MAX_SIZE_PERCENTAGE; import static org.opensearch.ad.settings.AnomalyDetectorSettings.BATCH_TASK_PIECE_INTERVAL_SECONDS; import static org.opensearch.ad.settings.AnomalyDetectorSettings.MAX_BATCH_TASK_PER_NODE; import static org.opensearch.ad.settings.AnomalyDetectorSettings.MAX_RUNNING_ENTITIES_PER_DETECTOR_FOR_HISTORICAL_ANALYSIS; @@ -28,6 +29,7 @@ import org.apache.hc.core5.http.ParseException; import org.apache.hc.core5.http.io.entity.EntityUtils; +import org.junit.After; import org.junit.Before; import org.junit.Ignore; import org.opensearch.ad.HistoricalAnalysisRestTestCase; @@ -59,6 +61,16 @@ public void setUp() throws Exception { updateClusterSettings(MAX_RUNNING_ENTITIES_PER_DETECTOR_FOR_HISTORICAL_ANALYSIS.getKey(), 2); updateClusterSettings(BATCH_TASK_PIECE_INTERVAL_SECONDS.getKey(), 5); updateClusterSettings(MAX_BATCH_TASK_PER_NODE.getKey(), 10); + // increase the AD memory percentage. Since enabling jacoco coverage instrumentation, + // the memory is not enough to finish HistoricalAnalysisRestApiIT. + updateClusterSettings(AD_MODEL_MAX_SIZE_PERCENTAGE.getKey(), 0.5); + } + + @After + @Override + public void tearDown() throws Exception { + updateClusterSettings(AD_MODEL_MAX_SIZE_PERCENTAGE.getKey(), 0.1); + super.tearDown(); } public void testHistoricalAnalysisForSingleEntityDetector() throws Exception { @@ -104,7 +116,6 @@ private void checkIfTaskCanFinishCorrectly(String detectorId, String taskId, Set } } - @SuppressWarnings("unchecked") private List startHistoricalAnalysis(int categoryFieldSize) throws Exception { return startHistoricalAnalysis(categoryFieldSize, null); } @@ -123,6 +134,9 @@ private List startHistoricalAnalysis(int categoryFieldSize, String resul if (!TaskState.RUNNING.name().equals(adTaskProfile.getTask().getState())) { adTaskProfile = (ADTaskProfile) waitUntilTaskReachState(detectorId, ImmutableSet.of(TaskState.RUNNING.name())).get(0); } + // if (adTaskProfile.getTotalEntitiesCount() == null) { + // adTaskProfile = (ADTaskProfile) waitUntilEntityCountAvailable(detectorId).get(0); + // } assertEquals((int) Math.pow(categoryFieldDocCount, categoryFieldSize), adTaskProfile.getTotalEntitiesCount().intValue()); assertTrue(adTaskProfile.getPendingEntitiesCount() > 0); assertTrue(adTaskProfile.getRunningEntitiesCount() > 0); @@ -170,8 +184,13 @@ public void testStopHistoricalAnalysis() throws Exception { waitUntilGetTaskProfile(detectorId); // stop historical detector - Response stopDetectorResponse = stopAnomalyDetector(detectorId, client(), false); - assertEquals(RestStatus.OK, TestHelpers.restStatus(stopDetectorResponse)); + try { + Response stopDetectorResponse = stopAnomalyDetector(detectorId, client(), false); + assertEquals(RestStatus.OK, TestHelpers.restStatus(stopDetectorResponse)); + } catch (Exception e) { + // it is possible the tasks has already stopped + assertTrue("expected No running task found but actual is " + e.getMessage(), e.getMessage().contains("No running task found")); + } // get task profile checkIfTaskCanFinishCorrectly(detectorId, taskId, ImmutableSet.of(TaskState.STOPPED.name())); diff --git a/src/test/java/org/opensearch/forecast/AbstractForecastSyntheticDataTest.java b/src/test/java/org/opensearch/forecast/AbstractForecastSyntheticDataTest.java new file mode 100644 index 000000000..a2575a457 --- /dev/null +++ b/src/test/java/org/opensearch/forecast/AbstractForecastSyntheticDataTest.java @@ -0,0 +1,57 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.forecast; + +import java.io.File; +import java.io.FileReader; +import java.io.IOException; +import java.net.URISyntaxException; +import java.nio.charset.Charset; +import java.util.ArrayList; +import java.util.List; + +import org.opensearch.timeseries.AbstractSyntheticDataTest; + +import com.google.gson.Gson; +import com.google.gson.JsonArray; +import com.google.gson.JsonObject; +import com.google.gson.stream.JsonReader; + +public class AbstractForecastSyntheticDataTest extends AbstractSyntheticDataTest { + /** + * Read data from a json array file up to a specified size + * @param datasetFileName data set file name + * @param size the limit of json elements to read + * @return the read JsonObject list + * @throws URISyntaxException when failing to find datasetFileName + * @throws Exception when there is a parsing error. + */ + public static List readJsonArrayWithLimit(String datasetFileName, int limit) throws URISyntaxException { + List jsonObjects = new ArrayList<>(); + + try ( + FileReader fileReader = new FileReader( + new File(AbstractForecastSyntheticDataTest.class.getClassLoader().getResource(datasetFileName).toURI()), + Charset.defaultCharset() + ); + JsonReader jsonReader = new JsonReader(fileReader) + ) { + + Gson gson = new Gson(); + JsonArray jsonArray = gson.fromJson(jsonReader, JsonArray.class); + + for (int i = 0; i < limit && i < jsonArray.size(); i++) { + JsonObject jsonObject = jsonArray.get(i).getAsJsonObject(); + jsonObjects.add(jsonObject); + } + + } catch (IOException e) { + LOG.error("fail to read json array", e); + } + + return jsonObjects; + } +} diff --git a/src/test/java/org/opensearch/forecast/rest/ForecastRestApiIT.java b/src/test/java/org/opensearch/forecast/rest/ForecastRestApiIT.java new file mode 100644 index 000000000..8776ec971 --- /dev/null +++ b/src/test/java/org/opensearch/forecast/rest/ForecastRestApiIT.java @@ -0,0 +1,455 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.forecast.rest; + +import static org.opensearch.timeseries.util.RestHandlerUtils.SUGGEST; + +import java.util.List; +import java.util.Locale; +import java.util.Map; + +import org.junit.Before; +import org.opensearch.client.Response; +import org.opensearch.client.ResponseException; +import org.opensearch.client.RestClient; +import org.opensearch.core.rest.RestStatus; +import org.opensearch.forecast.AbstractForecastSyntheticDataTest; +import org.opensearch.forecast.model.Forecaster; +import org.opensearch.forecast.settings.ForecastEnabledSetting; +import org.opensearch.timeseries.TestHelpers; +import org.opensearch.timeseries.TimeSeriesAnalyticsPlugin; +import org.opensearch.timeseries.constant.CommonMessages; +import org.opensearch.timeseries.model.Config; + +import com.google.common.collect.ImmutableMap; +import com.google.gson.JsonObject; + +/** + * Test the following Restful API: + * - Suggest + * + */ +public class ForecastRestApiIT extends AbstractForecastSyntheticDataTest { + private static final String SYNTHETIC_DATASET_NAME = "synthetic"; + private static final String RULE_DATASET_NAME = "rule"; + private static final String SUGGEST_INTERVAL_URI; + private static final String SUGGEST_INTERVAL_HORIZON_HISTORY_URI; + + static { + SUGGEST_INTERVAL_URI = String + .format( + Locale.ROOT, + "%s/%s/%s", + TimeSeriesAnalyticsPlugin.FORECAST_FORECASTERS_URI, + SUGGEST, + Forecaster.FORECAST_INTERVAL_FIELD + ); + SUGGEST_INTERVAL_HORIZON_HISTORY_URI = String + .format( + Locale.ROOT, + "%s/%s/%s,%s,%s", + TimeSeriesAnalyticsPlugin.FORECAST_FORECASTERS_URI, + SUGGEST, + Forecaster.FORECAST_INTERVAL_FIELD, + Forecaster.HORIZON_FIELD, + Config.HISTORY_INTERVAL_FIELD + ); + } + + @Override + @Before + public void setUp() throws Exception { + super.setUp(); + updateClusterSettings(ForecastEnabledSetting.FORECAST_ENABLED, true); + } + + private static void loadData(String datasetName, int trainTestSplit) throws Exception { + RestClient client = client(); + + String dataFileName = String.format(Locale.ROOT, "org/opensearch/ad/e2e/data/%s.data", datasetName); + + List data = readJsonArrayWithLimit(dataFileName, trainTestSplit); + + String mapping = "{ \"mappings\": { \"properties\": { \"timestamp\": { \"type\": \"date\"}," + + " \"Feature1\": { \"type\": \"double\" }, \"Feature2\": { \"type\": \"double\" } } } }"; + bulkIndexTrainData(datasetName, data, trainTestSplit, client, mapping); + } + + /** + * Test suggest API. I wrote multiple cases together to save time in loading data. Cannot load data once for + * all tests as client is set up in the instance method of OpenSearchRestTestCase and we need client to + * ingest data. Also, the JUnit 5 feature @BeforeAll is not supported in OpenSearchRestTestCase and the code + * inside @BeforeAll won't be executed even if I upgrade junit from 4 to 5 in build.gradle. + * @throws Exception when loading data + */ + public void testSuggestOneMinute() throws Exception { + loadData(SYNTHETIC_DATASET_NAME, 200); + // case 1: suggest 1 minute interval for a time series with 1 minute cadence + String forecasterDef = "{\n" + + " \"name\": \"Second-Test-Forecaster-4\",\n" + + " \"description\": \"ok rate\",\n" + + " \"time_field\": \"timestamp\",\n" + + " \"indices\": [\n" + + " \"%s\"\n" + + " ],\n" + + " \"feature_attributes\": [\n" + + " {\n" + + " \"feature_id\": \"sum1\",\n" + + " \"feature_name\": \"sum1\",\n" + + " \"feature_enabled\": true,\n" + + " \"importance\": 1,\n" + + " \"aggregation_query\": {\n" + + " \"sum1\": {\n" + + " \"sum\": {\n" + + " \"field\": \"Feature1\"\n" + + " }\n" + + " }\n" + + " }\n" + + " }\n" + + " ],\n" + + " \"window_delay\": {\n" + + " \"period\": {\n" + + " \"interval\": 20,\n" + + " \"unit\": \"SECONDS\"\n" + + " }\n" + + " },\n" + + " \"ui_metadata\": {\n" + + " \"aabb\": {\n" + + " \"ab\": \"bb\"\n" + + " }\n" + + " },\n" + + " \"schema_version\": 2,\n" + + " \"horizon\": 24\n" + + "}"; + + String formattedForecaster = String.format(Locale.ROOT, forecasterDef, SYNTHETIC_DATASET_NAME); + + Response response = TestHelpers + .makeRequest( + client(), + "POST", + String.format(Locale.ROOT, SUGGEST_INTERVAL_URI), + ImmutableMap.of(), + TestHelpers.toHttpEntity(formattedForecaster), + null + ); + assertEquals("Suggest forecaster interval failed", RestStatus.OK, TestHelpers.restStatus(response)); + Map responseMap = entityAsMap(response); + Map suggestions = (Map) ((Map) responseMap.get("interval")).get("period"); + assertEquals(1, (int) suggestions.get("interval")); + assertEquals("Minutes", suggestions.get("unit")); + + // case 2: If you provide forecaster interval in the request body, we will ignore it. + forecasterDef = "{\n" + + " \"name\": \"Second-Test-Detector-4\",\n" + + " \"description\": \"ok rate\",\n" + + " \"time_field\": \"timestamp\",\n" + + " \"indices\": [\n" + + " \"%s\"\n" + + " ],\n" + + " \"feature_attributes\": [\n" + + " {\n" + + " \"feature_id\": \"sum1\",\n" + + " \"feature_name\": \"sum1\",\n" + + " \"feature_enabled\": true,\n" + + " \"importance\": 1,\n" + + " \"aggregation_query\": {\n" + + " \"sum1\": {\n" + + " \"sum\": {\n" + + " \"field\": \"Feature1\"\n" + + " }\n" + + " }\n" + + " }\n" + + " }\n" + + " ],\n" + + " \"window_delay\": {\n" + + " \"period\": {\n" + + " \"interval\": 20,\n" + + " \"unit\": \"SECONDS\"\n" + + " }\n" + + " },\n" + + " \"ui_metadata\": {\n" + + " \"aabb\": {\n" + + " \"ab\": \"bb\"\n" + + " }\n" + + " },\n" + + " \"schema_version\": 2,\n" + + " \"horizon\": 24,\n" + + " \"forecast_interval\": {\n" + + " \"period\": {\n" + + " \"interval\": 4,\n" + + " \"unit\": \"MINUTES\"\n" + + " }\n" + + " }\n" + + "}"; + formattedForecaster = String.format(Locale.ROOT, forecasterDef, SYNTHETIC_DATASET_NAME); + response = TestHelpers + .makeRequest( + client(), + "POST", + String.format(Locale.ROOT, SUGGEST_INTERVAL_URI), + ImmutableMap.of(), + TestHelpers.toHttpEntity(formattedForecaster), + null + ); + responseMap = entityAsMap(response); + suggestions = (Map) ((Map) responseMap.get("interval")).get("period"); + assertEquals(1, (int) suggestions.get("interval")); + assertEquals("Minutes", suggestions.get("unit")); + + // case 3: We also support horizon and history. + forecasterDef = "{\n" + + " \"name\": \"Second-Test-Detector-4\",\n" + + " \"description\": \"ok rate\",\n" + + " \"time_field\": \"timestamp\",\n" + + " \"indices\": [\n" + + " \"%s\"\n" + + " ],\n" + + " \"feature_attributes\": [\n" + + " {\n" + + " \"feature_id\": \"sum1\",\n" + + " \"feature_name\": \"sum1\",\n" + + " \"feature_enabled\": true,\n" + + " \"importance\": 1,\n" + + " \"aggregation_query\": {\n" + + " \"sum1\": {\n" + + " \"sum\": {\n" + + " \"field\": \"Feature1\"\n" + + " }\n" + + " }\n" + + " }\n" + + " }\n" + + " ],\n" + + " \"window_delay\": {\n" + + " \"period\": {\n" + + " \"interval\": 20,\n" + + " \"unit\": \"SECONDS\"\n" + + " }\n" + + " },\n" + + " \"ui_metadata\": {\n" + + " \"aabb\": {\n" + + " \"ab\": \"bb\"\n" + + " }\n" + + " },\n" + + " \"schema_version\": 2,\n" + + " \"shingle_size\": 5,\n" + + " \"forecast_interval\": {\n" + + " \"period\": {\n" + + " \"interval\": 4,\n" + + " \"unit\": \"MINUTES\"\n" + + " }\n" + + " }\n" + + "}"; + formattedForecaster = String.format(Locale.ROOT, forecasterDef, SYNTHETIC_DATASET_NAME); + response = TestHelpers + .makeRequest( + client(), + "POST", + String.format(Locale.ROOT, SUGGEST_INTERVAL_HORIZON_HISTORY_URI), + ImmutableMap.of(), + TestHelpers.toHttpEntity(formattedForecaster), + null + ); + responseMap = entityAsMap(response); + Map intervalSuggestions = (Map) ((Map) responseMap.get("interval")).get("period"); + assertEquals(1, (int) intervalSuggestions.get("interval")); + assertEquals("Minutes", intervalSuggestions.get("unit")); + + int horizonSuggestions = ((Integer) responseMap.get("horizon")); + assertEquals(15, horizonSuggestions); + + int historySuggestions = ((Integer) responseMap.get("history")); + assertEquals(37, historySuggestions); + } + + public void testSuggestTenMinute() throws Exception { + loadData(RULE_DATASET_NAME, 200); + // case 1: The following request validates against the source data to see if model training might succeed. In this example, the data + // is ingested at a rate of every ten minutes. + final String forecasterDef = "{\n" + + " \"name\": \"Second-Test-Forecaster-4\",\n" + + " \"description\": \"ok rate\",\n" + + " \"time_field\": \"timestamp\",\n" + + " \"indices\": [\n" + + " \"%s\"\n" + + " ],\n" + + " \"feature_attributes\": [\n" + + " {\n" + + " \"feature_id\": \"sum1\",\n" + + " \"feature_name\": \"sum1\",\n" + + " \"feature_enabled\": true,\n" + + " \"importance\": 1,\n" + + " \"aggregation_query\": {\n" + + " \"sum1\": {\n" + + " \"sum\": {\n" + + " \"field\": \"transform._doc_count\"\n" + + " }\n" + + " }\n" + + " }\n" + + " }\n" + + " ],\n" + + " \"window_delay\": {\n" + + " \"period\": {\n" + + " \"interval\": 20,\n" + + " \"unit\": \"SECONDS\"\n" + + " }\n" + + " },\n" + + " \"ui_metadata\": {\n" + + " \"aabb\": {\n" + + " \"ab\": \"bb\"\n" + + " }\n" + + " },\n" + + " \"schema_version\": 2,\n" + + " \"horizon\": 24\n" + + "}"; + + String formattedForecaster = String.format(Locale.ROOT, forecasterDef, RULE_DATASET_NAME); + + Response response = TestHelpers + .makeRequest( + client(), + "POST", + String.format(Locale.ROOT, SUGGEST_INTERVAL_URI), + ImmutableMap.of(), + TestHelpers.toHttpEntity(formattedForecaster), + null + ); + assertEquals("Suggest forecaster interval failed", RestStatus.OK, TestHelpers.restStatus(response)); + Map responseMap = entityAsMap(response); + Map suggestions = (Map) ((Map) responseMap.get("interval")).get("period"); + assertEquals(10, (int) suggestions.get("interval")); + assertEquals("Minutes", suggestions.get("unit")); + + // case 2: If you provide unsupported parameters, we will throw 400 error. + ResponseException exception = expectThrows( + ResponseException.class, + () -> TestHelpers + .makeRequest( + client(), + "POST", + String.format(Locale.ROOT, SUGGEST_INTERVAL_URI + "2"), + ImmutableMap.of(), + TestHelpers.toHttpEntity(forecasterDef), + null + ) + ); + assertEquals(RestStatus.BAD_REQUEST.getStatus(), exception.getResponse().getStatusLine().getStatusCode()); + assertTrue( + "Expect contains" + CommonMessages.NOT_EXISTENT_SUGGEST_TYPE + " ; but got" + exception.getMessage(), + exception.getMessage().contains(CommonMessages.NOT_EXISTENT_SUGGEST_TYPE) + ); + + // case 3: If the provided configuration lacks enough information (e.g., source index), we will throw 400 error. + final String errorForecasterDef = "{\n" + + " \"name\": \"Second-Test-Forecaster-4\",\n" + + " \"description\": \"ok rate\",\n" + + " \"time_field\": \"timestamp\",\n" + + " \"feature_attributes\": [\n" + + " {\n" + + " \"feature_id\": \"sum1\",\n" + + " \"feature_name\": \"sum1\",\n" + + " \"feature_enabled\": true,\n" + + " \"importance\": 1,\n" + + " \"aggregation_query\": {\n" + + " \"sum1\": {\n" + + " \"sum\": {\n" + + " \"field\": \"transform._doc_count\"\n" + + " }\n" + + " }\n" + + " }\n" + + " }\n" + + " ],\n" + + " \"window_delay\": {\n" + + " \"period\": {\n" + + " \"interval\": 20,\n" + + " \"unit\": \"SECONDS\"\n" + + " }\n" + + " },\n" + + " \"ui_metadata\": {\n" + + " \"aabb\": {\n" + + " \"ab\": \"bb\"\n" + + " }\n" + + " },\n" + + " \"schema_version\": 2,\n" + + " \"horizon\": 24\n" + + "}"; + + ResponseException noSourceIndexException = expectThrows( + ResponseException.class, + () -> TestHelpers + .makeRequest( + client(), + "POST", + String.format(Locale.ROOT, SUGGEST_INTERVAL_URI), + ImmutableMap.of(), + TestHelpers.toHttpEntity(errorForecasterDef), + null + ) + ); + assertEquals(RestStatus.BAD_REQUEST.getStatus(), noSourceIndexException.getResponse().getStatusLine().getStatusCode()); + assertTrue( + "Expect contains" + CommonMessages.EMPTY_INDICES + " ; but got" + noSourceIndexException.getMessage(), + noSourceIndexException.getMessage().contains(CommonMessages.EMPTY_INDICES) + ); + } + + public void testSuggestSparseData() throws Exception { + loadData(SYNTHETIC_DATASET_NAME, 10); + // case 1: suggest 1 minute interval for a time series with 1 minute cadence + String forecasterDef = "{\n" + + " \"name\": \"Second-Test-Forecaster-4\",\n" + + " \"description\": \"ok rate\",\n" + + " \"time_field\": \"timestamp\",\n" + + " \"indices\": [\n" + + " \"%s\"\n" + + " ],\n" + + " \"feature_attributes\": [\n" + + " {\n" + + " \"feature_id\": \"sum1\",\n" + + " \"feature_name\": \"sum1\",\n" + + " \"feature_enabled\": true,\n" + + " \"importance\": 1,\n" + + " \"aggregation_query\": {\n" + + " \"sum1\": {\n" + + " \"sum\": {\n" + + " \"field\": \"Feature1\"\n" + + " }\n" + + " }\n" + + " }\n" + + " }\n" + + " ],\n" + + " \"window_delay\": {\n" + + " \"period\": {\n" + + " \"interval\": 20,\n" + + " \"unit\": \"SECONDS\"\n" + + " }\n" + + " },\n" + + " \"ui_metadata\": {\n" + + " \"aabb\": {\n" + + " \"ab\": \"bb\"\n" + + " }\n" + + " },\n" + + " \"schema_version\": 2,\n" + + " \"horizon\": 24\n" + + "}"; + + String formattedForecaster = String.format(Locale.ROOT, forecasterDef, SYNTHETIC_DATASET_NAME); + + Response response = TestHelpers + .makeRequest( + client(), + "POST", + String.format(Locale.ROOT, SUGGEST_INTERVAL_URI), + ImmutableMap.of(), + TestHelpers.toHttpEntity(formattedForecaster), + null + ); + assertEquals("Suggest forecaster interval failed", RestStatus.OK, TestHelpers.restStatus(response)); + Map responseMap = entityAsMap(response); + // no suggestion + assertEquals(0, responseMap.size()); + } +} diff --git a/src/test/java/org/opensearch/forecast/settings/ForecastEnabledSettingTests.java b/src/test/java/org/opensearch/forecast/settings/ForecastEnabledSettingTests.java index 880ca5a02..09e561e28 100644 --- a/src/test/java/org/opensearch/forecast/settings/ForecastEnabledSettingTests.java +++ b/src/test/java/org/opensearch/forecast/settings/ForecastEnabledSettingTests.java @@ -14,8 +14,8 @@ public void testIsForecastEnabled() { try { ForecastEnabledSetting.getInstance().setSettingValue(ForecastEnabledSetting.FORECAST_ENABLED, true); // TODO: currently, we disable forecasting - // assertTrue(ForecastEnabledSetting.isForecastEnabled()); - assertTrue(!ForecastEnabledSetting.isForecastEnabled()); + assertTrue(ForecastEnabledSetting.isForecastEnabled()); + // assertTrue(!ForecastEnabledSetting.isForecastEnabled()); ForecastEnabledSetting.getInstance().setSettingValue(ForecastEnabledSetting.FORECAST_ENABLED, false); assertTrue(!ForecastEnabledSetting.isForecastEnabled()); } finally { diff --git a/src/test/java/org/opensearch/timeseries/AbstractSyntheticDataTest.java b/src/test/java/org/opensearch/timeseries/AbstractSyntheticDataTest.java new file mode 100644 index 000000000..929670d2b --- /dev/null +++ b/src/test/java/org/opensearch/timeseries/AbstractSyntheticDataTest.java @@ -0,0 +1,157 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.timeseries; + +import static org.opensearch.timeseries.TestHelpers.toHttpEntity; + +import java.io.IOException; +import java.io.InputStreamReader; +import java.nio.charset.Charset; +import java.util.List; +import java.util.Locale; + +import org.apache.hc.core5.http.HttpHeaders; +import org.apache.hc.core5.http.message.BasicHeader; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.core.Logger; +import org.opensearch.client.Request; +import org.opensearch.client.RequestOptions; +import org.opensearch.client.Response; +import org.opensearch.client.RestClient; +import org.opensearch.client.WarningsHandler; +import org.opensearch.common.xcontent.json.JsonXContent; +import org.opensearch.core.xcontent.XContentBuilder; +import org.opensearch.timeseries.settings.TimeSeriesSettings; + +import com.google.common.collect.ImmutableList; +import com.google.gson.JsonArray; +import com.google.gson.JsonObject; +import com.google.gson.JsonParser; + +public class AbstractSyntheticDataTest extends ODFERestTestCase { + public static final Logger LOG = (Logger) LogManager.getLogger(AbstractSyntheticDataTest.class); + + /** + * In real time AD, we mute a node for a detector if that node keeps returning + * ResourceNotFoundException (5 times in a row). This is a problem for batch mode + * testing as we issue a large amount of requests quickly. Due to the speed, we + * won't be able to finish cold start before the ResourceNotFoundException mutes + * a node. Since our test case has only one node, there is no other nodes to fall + * back on. Here we disable such fault tolerance by setting max retries before + * muting to a large number and the actual wait time during muting to 0. + * + * @throws IOException when failing to create http request body + */ + protected void disableResourceNotFoundFaultTolerence() throws IOException { + XContentBuilder settingCommand = JsonXContent.contentBuilder(); + + settingCommand.startObject(); + settingCommand.startObject("persistent"); + settingCommand.field(TimeSeriesSettings.MAX_RETRY_FOR_UNRESPONSIVE_NODE.getKey(), 100_000); + settingCommand.field(TimeSeriesSettings.BACKOFF_MINUTES.getKey(), 0); + settingCommand.endObject(); + settingCommand.endObject(); + Request request = new Request("PUT", "/_cluster/settings"); + request.setJsonEntity(settingCommand.toString()); + + adminClient().performRequest(request); + } + + public static void waitAllSyncheticDataIngested(int expectedSize, String datasetName, RestClient client) throws Exception { + int maxWaitCycles = 3; + do { + Request request = new Request("POST", String.format(Locale.ROOT, "/%s/_search", datasetName)); + request + .setJsonEntity( + String + .format( + Locale.ROOT, + "{\"query\": {" + + " \"match_all\": {}" + + " }," + + " \"size\": 1," + + " \"sort\": [" + + " {" + + " \"timestamp\": {" + + " \"order\": \"desc\"" + + " }" + + " }" + + " ]}" + ) + ); + // Make sure all of the test data has been ingested + JsonArray hits = getHits(client, request); + LOG.info("Latest synthetic data:" + hits); + if (hits != null + && hits.size() == 1 + && expectedSize - 1 == hits.get(0).getAsJsonObject().getAsJsonPrimitive("_id").getAsLong()) { + break; + } else { + request = new Request("POST", String.format(Locale.ROOT, "/%s/_refresh", datasetName)); + client.performRequest(request); + } + Thread.sleep(1_000); + } while (maxWaitCycles-- >= 0); + } + + public static JsonArray getHits(RestClient client, Request request) throws IOException { + Response response = client.performRequest(request); + return parseHits(response); + } + + public static JsonArray parseHits(Response response) throws IOException { + JsonObject json = JsonParser + .parseReader(new InputStreamReader(response.getEntity().getContent(), Charset.defaultCharset())) + .getAsJsonObject(); + JsonObject hits = json.getAsJsonObject("hits"); + if (hits == null) { + return null; + } + return hits.getAsJsonArray("hits"); + } + + protected static void bulkIndexTrainData( + String datasetName, + List data, + int trainTestSplit, + RestClient client, + String mapping + ) throws Exception { + createIndex(datasetName, client, mapping); + + StringBuilder bulkRequestBuilder = new StringBuilder(); + for (int i = 0; i < trainTestSplit; i++) { + bulkRequestBuilder.append("{ \"index\" : { \"_index\" : \"" + datasetName + "\", \"_id\" : \"" + i + "\" } }\n"); + bulkRequestBuilder.append(data.get(i).toString()).append("\n"); + } + TestHelpers + .makeRequest( + client, + "POST", + "_bulk?refresh=true", + null, + toHttpEntity(bulkRequestBuilder.toString()), + ImmutableList.of(new BasicHeader(HttpHeaders.USER_AGENT, "Kibana")) + ); + Thread.sleep(1_000); + waitAllSyncheticDataIngested(trainTestSplit, datasetName, client); + } + + public static void createIndex(String datasetName, RestClient client, String mapping) throws IOException, InterruptedException { + Request request = new Request("PUT", datasetName); + request.setJsonEntity(mapping); + setWarningHandler(request, false); + client.performRequest(request); + Thread.sleep(1_000); + } + + public static void setWarningHandler(Request request, boolean strictDeprecationMode) { + RequestOptions.Builder options = RequestOptions.DEFAULT.toBuilder(); + options.setWarningsHandler(strictDeprecationMode ? WarningsHandler.STRICT : WarningsHandler.PERMISSIVE); + request.setOptions(options.build()); + } + +} diff --git a/src/test/java/org/opensearch/ad/ODFERestTestCase.java b/src/test/java/org/opensearch/timeseries/ODFERestTestCase.java similarity index 77% rename from src/test/java/org/opensearch/ad/ODFERestTestCase.java rename to src/test/java/org/opensearch/timeseries/ODFERestTestCase.java index 5ee0fec98..eccc2ee53 100644 --- a/src/test/java/org/opensearch/ad/ODFERestTestCase.java +++ b/src/test/java/org/opensearch/timeseries/ODFERestTestCase.java @@ -9,7 +9,7 @@ * GitHub history for details. */ -package org.opensearch.ad; +package org.opensearch.timeseries; import static org.opensearch.client.RestClientBuilder.DEFAULT_MAX_CONN_PER_ROUTE; import static org.opensearch.client.RestClientBuilder.DEFAULT_MAX_CONN_TOTAL; @@ -20,19 +20,23 @@ import static org.opensearch.commons.ConfigConstants.OPENSEARCH_SECURITY_SSL_HTTP_PEMCERT_FILEPATH; import java.io.IOException; -import java.io.InputStreamReader; import java.net.URI; import java.net.URISyntaxException; -import java.nio.charset.Charset; +import java.nio.file.Files; import java.nio.file.Path; import java.util.Collections; import java.util.List; -import java.util.Locale; import java.util.Map; import java.util.Objects; import java.util.Optional; import java.util.stream.Collectors; +import javax.management.MBeanServerInvocationHandler; +import javax.management.MalformedObjectNameException; +import javax.management.ObjectName; +import javax.management.remote.JMXConnector; +import javax.management.remote.JMXConnectorFactory; +import javax.management.remote.JMXServiceURL; import javax.net.ssl.SSLEngine; import org.apache.hc.client5.http.auth.AuthScope; @@ -50,7 +54,10 @@ import org.apache.hc.core5.reactor.ssl.TlsDetails; import org.apache.hc.core5.ssl.SSLContextBuilder; import org.apache.hc.core5.util.Timeout; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.core.Logger; import org.junit.After; +import org.junit.AfterClass; import org.opensearch.client.Request; import org.opensearch.client.Response; import org.opensearch.client.RestClient; @@ -59,22 +66,21 @@ import org.opensearch.common.settings.Settings; import org.opensearch.common.unit.TimeValue; import org.opensearch.common.util.concurrent.ThreadContext; +import org.opensearch.common.xcontent.XContentFactory; import org.opensearch.commons.rest.SecureRestClientBuilder; import org.opensearch.core.rest.RestStatus; import org.opensearch.core.xcontent.DeprecationHandler; import org.opensearch.core.xcontent.MediaType; import org.opensearch.core.xcontent.NamedXContentRegistry; +import org.opensearch.core.xcontent.XContentBuilder; import org.opensearch.core.xcontent.XContentParser; import org.opensearch.test.rest.OpenSearchRestTestCase; -import com.google.gson.JsonArray; -import com.google.gson.JsonObject; -import com.google.gson.JsonParser; - /** * ODFE integration test base class to support both security disabled and enabled ODFE cluster. */ public abstract class ODFERestTestCase extends OpenSearchRestTestCase { + private static final Logger LOG = (Logger) LogManager.getLogger(ODFERestTestCase.class); protected boolean isHttps() { boolean isHttps = Optional.ofNullable(System.getProperty("https")).map("true"::equalsIgnoreCase).orElse(false); @@ -240,6 +246,33 @@ public TlsDetails create(final SSLEngine sslEngine) { } } + @AfterClass + public static void dumpCoverage() throws IOException, MalformedObjectNameException { + // jacoco.dir is set in esplugin-coverage.gradle, if it doesn't exist we don't + // want to collect coverage so we can return early + String jacocoBuildPath = System.getProperty("jacoco.dir"); + if (org.opensearch.core.common.Strings.isNullOrEmpty(jacocoBuildPath)) { + return; + } + + String serverUrl = System.getProperty("jmx.serviceUrl"); + if (serverUrl == null) { + LOG.error("Failed to dump coverage because JMX Service URL is null"); + throw new IllegalArgumentException("JMX Service URL is null"); + } + + try (JMXConnector connector = JMXConnectorFactory.connect(new JMXServiceURL(serverUrl))) { + IProxy proxy = MBeanServerInvocationHandler + .newProxyInstance(connector.getMBeanServerConnection(), new ObjectName("org.jacoco:type=Runtime"), IProxy.class, false); + + Path path = Path.of(Path.of(jacocoBuildPath, "integTest.exec").toFile().getCanonicalPath()); + Files.write(path, proxy.getExecutionData(false)); + } catch (Exception ex) { + LOG.error("Failed to dump coverage: ", ex); + throw new RuntimeException("Failed to dump coverage: " + ex); + } + } + /** * wipeAllIndices won't work since it cannot delete security index. Use wipeAllODFEIndices instead. */ @@ -248,45 +281,34 @@ protected boolean preserveIndicesUponCompletion() { return true; } - protected void waitAllSyncheticDataIngested(int expectedSize, String datasetName, RestClient client) throws Exception { - int maxWaitCycles = 3; - do { - Request request = new Request("POST", String.format(Locale.ROOT, "/%s/_search", datasetName)); - request - .setJsonEntity( - String - .format( - Locale.ROOT, - "{\"query\": {" - + " \"match_all\": {}" - + " }," - + " \"size\": 1," - + " \"sort\": [" - + " {" - + " \"timestamp\": {" - + " \"order\": \"desc\"" - + " }" - + " }" - + " ]}" - ) - ); - // Make sure all of the test data has been ingested - // Expected response: - // "_index":"synthetic","_type":"_doc","_id":"10080","_score":null,"_source":{"timestamp":"2019-11-08T00:00:00Z","Feature1":156.30028000000001,"Feature2":100.211205,"host":"host1"},"sort":[1573171200000]} - Response response = client.performRequest(request); - JsonObject json = JsonParser - .parseReader(new InputStreamReader(response.getEntity().getContent(), Charset.defaultCharset())) - .getAsJsonObject(); - JsonArray hits = json.getAsJsonObject("hits").getAsJsonArray("hits"); - if (hits != null - && hits.size() == 1 - && expectedSize - 1 == hits.get(0).getAsJsonObject().getAsJsonPrimitive("_id").getAsLong()) { - break; - } else { - request = new Request("POST", String.format(Locale.ROOT, "/%s/_refresh", datasetName)); - client.performRequest(request); - } - Thread.sleep(1_000); - } while (maxWaitCycles-- >= 0); + public void updateClusterSettings(String settingKey, Object value) throws Exception { + XContentBuilder builder = XContentFactory + .jsonBuilder() + .startObject() + .startObject("persistent") + .field(settingKey, value) + .endObject() + .endObject(); + Request request = new Request("PUT", "_cluster/settings"); + request.setJsonEntity(builder.toString()); + Response response = client().performRequest(request); + assertEquals(RestStatus.OK, RestStatus.fromCode(response.getStatusLine().getStatusCode())); + Thread.sleep(2000); // sleep some time to resolve flaky test + } + + /** + * We need to be able to dump the jacoco coverage before cluster is shut down. + * The new internal testing framework removed some of the gradle tasks we were listening to + * to choose a good time to do it. This will dump the executionData to file after each test. + * TODO: This is also currently just overwriting integTest.exec with the updated execData without + * resetting after writing each time. This can be improved to either write an exec file per test + * or by letting jacoco append to the file + */ + public interface IProxy { + byte[] getExecutionData(boolean reset); + + void dump(boolean reset); + + void reset(); } } diff --git a/src/test/java/org/opensearch/timeseries/TestHelpers.java b/src/test/java/org/opensearch/timeseries/TestHelpers.java index 224f6c68c..e633f2734 100644 --- a/src/test/java/org/opensearch/timeseries/TestHelpers.java +++ b/src/test/java/org/opensearch/timeseries/TestHelpers.java @@ -124,6 +124,7 @@ import org.opensearch.index.query.MatchAllQueryBuilder; import org.opensearch.index.query.QueryBuilder; import org.opensearch.jobscheduler.spi.schedule.IntervalSchedule; +import org.opensearch.jobscheduler.spi.schedule.Schedule; import org.opensearch.search.SearchHit; import org.opensearch.search.SearchHits; import org.opensearch.search.SearchModule; @@ -1855,6 +1856,11 @@ public ForecasterBuilder setNullImputationOption() { return this; } + public ForecasterBuilder setHorizon(Integer horizon) { + this.horizon = horizon; + return this; + } + public Forecaster build() { return new Forecaster( forecasterId, @@ -2051,4 +2057,98 @@ public ForecastResult build() { ); } } + + public static class JobBuilder { + private String name = randomAlphaOfLength(10); + private Schedule schedule = randomIntervalSchedule(); + private TimeConfiguration windowDelay = randomIntervalTimeConfiguration(); + private Boolean isEnabled = true; + private Instant enabledTime = Instant.now().truncatedTo(ChronoUnit.SECONDS); + private Instant disabledTime = Instant.now().truncatedTo(ChronoUnit.SECONDS); + private Instant lastUpdateTime = Instant.now().truncatedTo(ChronoUnit.SECONDS); + private Long lockDurationSeconds = 60L; + private User user = randomUser(); + private String resultIndex = null; + private AnalysisType analysisType = AnalysisType.AD; + + public JobBuilder() { + + } + + public static JobBuilder newInstance() { + return new JobBuilder(); + } + + public JobBuilder name(String name) { + this.name = name; + return this; + } + + public JobBuilder schedule(Schedule schedule) { + this.schedule = schedule; + return this; + } + + public JobBuilder windowDelay(TimeConfiguration windowDelay) { + this.windowDelay = windowDelay; + return this; + } + + public JobBuilder isEnabled(Boolean isEnabled) { + this.isEnabled = isEnabled; + return this; + } + + public JobBuilder enabledTime(Instant enabledTime) { + this.enabledTime = enabledTime; + return this; + } + + public JobBuilder disabledTime(Instant disabledTime) { + this.disabledTime = disabledTime; + return this; + } + + public JobBuilder lastUpdateTime(Instant lastUpdateTime) { + this.lastUpdateTime = lastUpdateTime; + return this; + } + + public JobBuilder lockDurationSeconds(Long lockDurationSeconds) { + this.lockDurationSeconds = lockDurationSeconds; + return this; + } + + public JobBuilder user(User user) { + this.user = user; + return this; + } + + public JobBuilder resultIndex(String resultIndex) { + this.resultIndex = resultIndex; + return this; + } + + public JobBuilder analysisType(AnalysisType analysisType) { + this.analysisType = analysisType; + return this; + } + + public Job build() { + return new Job( + name, + schedule, + windowDelay, + isEnabled, + enabledTime, + disabledTime, + lastUpdateTime, + lockDurationSeconds, + user, + resultIndex, + analysisType + ); + } + } + }