Skip to content

Commit

Permalink
test
Browse files Browse the repository at this point in the history
Signed-off-by: Kaituo Li <[email protected]>
  • Loading branch information
kaituo committed Jul 18, 2023
1 parent f0ed43b commit 64c066a
Show file tree
Hide file tree
Showing 448 changed files with 24,189 additions and 13,646 deletions.
12 changes: 8 additions & 4 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -126,9 +126,12 @@ dependencies {
implementation group: 'com.yahoo.datasketches', name: 'memory', version: '0.12.2'
implementation group: 'commons-lang', name: 'commons-lang', version: '2.6'
implementation group: 'org.apache.commons', name: 'commons-pool2', version: '2.10.0'
implementation 'software.amazon.randomcutforest:randomcutforest-serialization:3.0-rc3'
implementation 'software.amazon.randomcutforest:randomcutforest-parkservices:3.0-rc3'
implementation 'software.amazon.randomcutforest:randomcutforest-core:3.0-rc3'
implementation 'software.amazon.randomcutforest:randomcutforest-serialization:3.7.0'
implementation 'software.amazon.randomcutforest:randomcutforest-parkservices:3.7.0'
implementation 'software.amazon.randomcutforest:randomcutforest-core:3.7.0'
//implementation files('lib/randomcutforest-core-3.5.0.jar')
//implementation files('lib/randomcutforest-serialization-3.5.0.jar')
//implementation files('lib/randomcutforest-parkservices-3.5.0.jar')

// we inherit jackson-core from opensearch core
implementation "com.fasterxml.jackson.core:jackson-databind:2.14.1"
Expand Down Expand Up @@ -402,7 +405,8 @@ testClusters.integTest {
return new RegularFile() {
@Override
File getAsFile() {
return configurations.zipArchive.asFileTree.getSingleFile()
//return configurations.zipArchive.asFileTree.getSingleFile()
return fileTree("src/test/resources/job-scheduler").getSingleFile()
}
}
}
Expand Down
55 changes: 55 additions & 0 deletions src/main/java/org/opensearch/ad/ADJobProcessor.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.ad;

import org.opensearch.ad.indices.ADIndex;
import org.opensearch.ad.indices.ADIndexManagement;
import org.opensearch.ad.model.ADTask;
import org.opensearch.ad.model.ADTaskType;
import org.opensearch.ad.model.AnomalyResult;
import org.opensearch.ad.settings.AnomalyDetectorSettings;
import org.opensearch.ad.task.ADTaskCacheManager;
import org.opensearch.ad.task.ADTaskManager;
import org.opensearch.ad.transport.AnomalyResultAction;
import org.opensearch.ad.transport.AnomalyResultRequest;
import org.opensearch.common.settings.Settings;
import org.opensearch.timeseries.AnalysisType;
import org.opensearch.timeseries.JobProcessor;
import org.opensearch.timeseries.TimeSeriesAnalyticsPlugin;
import org.opensearch.timeseries.transport.ResultRequest;

public class ADJobProcessor extends
JobProcessor<ADIndex, ADIndexManagement, ADTaskCacheManager, ADTaskType, ADTask, ADTaskManager, AnomalyResult, ExecuteADResultResponseRecorder> {

private static ADJobProcessor INSTANCE;

public static ADJobProcessor getInstance() {
if (INSTANCE != null) {
return INSTANCE;
}
synchronized (JobProcessor.class) {
if (INSTANCE != null) {
return INSTANCE;
}
INSTANCE = new ADJobProcessor();
return INSTANCE;
}
}

private ADJobProcessor() {
// Singleton class, use getJobRunnerInstance method instead of constructor
super(AnalysisType.AD, TimeSeriesAnalyticsPlugin.AD_THREAD_POOL_NAME, AnomalyResultAction.INSTANCE);
}

public void registerSettings(Settings settings) {
super.registerSettings(settings, AnomalyDetectorSettings.AD_MAX_RETRY_FOR_END_RUN_EXCEPTION);
}

@Override
protected ResultRequest createResultRequest(String configId, long start, long end) {
return new AnomalyResultRequest(configId, start, end);
}
}
45 changes: 25 additions & 20 deletions src/main/java/org/opensearch/ad/AnomalyDetectorProfileRunner.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
import static org.opensearch.core.rest.RestStatus.BAD_REQUEST;
import static org.opensearch.core.rest.RestStatus.INTERNAL_SERVER_ERROR;
import static org.opensearch.core.xcontent.XContentParserUtils.ensureExpectedToken;
import static org.opensearch.timeseries.constant.CommonMessages.FAIL_TO_FIND_CONFIG_MSG;

import java.util.List;
import java.util.Map;
Expand All @@ -35,10 +34,8 @@
import org.opensearch.ad.constant.ADCommonName;
import org.opensearch.ad.model.ADTaskType;
import org.opensearch.ad.model.AnomalyDetector;
import org.opensearch.ad.model.AnomalyDetectorJob;
import org.opensearch.ad.model.DetectorProfile;
import org.opensearch.ad.model.DetectorProfileName;
import org.opensearch.ad.model.DetectorState;
import org.opensearch.ad.model.InitProgressProfile;
import org.opensearch.ad.settings.ADNumericSetting;
import org.opensearch.ad.settings.AnomalyDetectorSettings;
Expand All @@ -49,9 +46,6 @@
import org.opensearch.ad.transport.RCFPollingAction;
import org.opensearch.ad.transport.RCFPollingRequest;
import org.opensearch.ad.transport.RCFPollingResponse;
import org.opensearch.ad.util.ExceptionUtil;
import org.opensearch.ad.util.MultiResponsesDelegateActionListener;
import org.opensearch.ad.util.SecurityClientUtil;
import org.opensearch.client.Client;
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.common.xcontent.LoggingDeprecationHandler;
Expand All @@ -68,11 +62,19 @@
import org.opensearch.search.aggregations.metrics.CardinalityAggregationBuilder;
import org.opensearch.search.aggregations.metrics.InternalCardinality;
import org.opensearch.search.builder.SearchSourceBuilder;
import org.opensearch.timeseries.AnalysisType;
import org.opensearch.timeseries.ProfileUtil;
import org.opensearch.timeseries.common.exception.NotSerializedExceptionName;
import org.opensearch.timeseries.common.exception.ResourceNotFoundException;
import org.opensearch.timeseries.constant.CommonMessages;
import org.opensearch.timeseries.constant.CommonName;
import org.opensearch.timeseries.model.ConfigState;
import org.opensearch.timeseries.model.IntervalTimeConfiguration;
import org.opensearch.timeseries.model.Job;
import org.opensearch.timeseries.util.DiscoveryNodeFilterer;
import org.opensearch.timeseries.util.ExceptionUtil;
import org.opensearch.timeseries.util.MultiResponsesDelegateActionListener;
import org.opensearch.timeseries.util.SecurityClientUtil;
import org.opensearch.transport.TransportService;

public class AnomalyDetectorProfileRunner extends AbstractProfileRunner {
Expand Down Expand Up @@ -136,11 +138,11 @@ private void calculateTotalResponsesToWait(
listener.onFailure(new OpenSearchStatusException(FAIL_TO_PARSE_DETECTOR_MSG + detectorId, BAD_REQUEST));
}
} else {
listener.onFailure(new OpenSearchStatusException(FAIL_TO_FIND_CONFIG_MSG + detectorId, BAD_REQUEST));
listener.onFailure(new OpenSearchStatusException(CommonMessages.FAIL_TO_FIND_CONFIG_MSG + detectorId, BAD_REQUEST));
}
}, exception -> {
logger.error(FAIL_TO_FIND_CONFIG_MSG + detectorId, exception);
listener.onFailure(new OpenSearchStatusException(FAIL_TO_FIND_CONFIG_MSG + detectorId, INTERNAL_SERVER_ERROR));
logger.error(CommonMessages.FAIL_TO_FIND_CONFIG_MSG + detectorId, exception);
listener.onFailure(new OpenSearchStatusException(CommonMessages.FAIL_TO_FIND_CONFIG_MSG + detectorId, INTERNAL_SERVER_ERROR));
}));
}

Expand All @@ -159,7 +161,7 @@ private void prepareProfile(
.createParser(xContentRegistry, LoggingDeprecationHandler.INSTANCE, getResponse.getSourceAsString())
) {
ensureExpectedToken(XContentParser.Token.START_OBJECT, parser.nextToken(), parser);
AnomalyDetectorJob job = AnomalyDetectorJob.parse(parser);
Job job = Job.parse(parser);
long enabledTimeMs = job.getEnabledTime().toEpochMilli();

boolean isMultiEntityDetector = detector.isHighCardinality();
Expand Down Expand Up @@ -211,7 +213,7 @@ private void prepareProfile(
false
);
if (profilesToCollect.contains(DetectorProfileName.ERROR)) {
adTaskManager.getAndExecuteOnLatestDetectorLevelTask(detectorId, ADTaskType.REALTIME_TASK_TYPES, adTask -> {
adTaskManager.getAndExecuteOnLatestConfigLevelTask(detectorId, ADTaskType.REALTIME_TASK_TYPES, adTask -> {
DetectorProfile.Builder profileBuilder = new DetectorProfile.Builder();
if (adTask.isPresent()) {
long lastUpdateTimeMs = adTask.get().getLastUpdateTime().toEpochMilli();
Expand Down Expand Up @@ -315,6 +317,7 @@ private void profileEntityStats(MultiResponsesDelegateActionListener<DetectorPro
client::search,
detector.getId(),
client,
AnalysisType.AD,
searchResponseListener
);
} else {
Expand Down Expand Up @@ -368,6 +371,7 @@ private void profileEntityStats(MultiResponsesDelegateActionListener<DetectorPro
client::search,
detector.getId(),
client,
AnalysisType.AD,
searchResponseListener
);
}
Expand All @@ -378,7 +382,7 @@ private void profileEntityStats(MultiResponsesDelegateActionListener<DetectorPro
private void onGetDetectorForPrepare(String detectorId, ActionListener<DetectorProfile> listener, Set<DetectorProfileName> profiles) {
DetectorProfile.Builder profileBuilder = new DetectorProfile.Builder();
if (profiles.contains(DetectorProfileName.STATE)) {
profileBuilder.state(DetectorState.DISABLED);
profileBuilder.state(ConfigState.DISABLED);
}
if (profiles.contains(DetectorProfileName.AD_TASK)) {
adTaskManager.getLatestHistoricalTaskProfile(detectorId, transportService, profileBuilder.build(), listener);
Expand Down Expand Up @@ -409,7 +413,7 @@ private void profileStateRelated(
} else {
DetectorProfile.Builder builder = new DetectorProfile.Builder();
if (profilesToCollect.contains(DetectorProfileName.STATE)) {
builder.state(DetectorState.DISABLED);
builder.state(ConfigState.DISABLED);
}
listener.onResponse(builder.build());
}
Expand All @@ -418,7 +422,7 @@ private void profileStateRelated(
private void profileModels(
AnomalyDetector detector,
Set<DetectorProfileName> profiles,
AnomalyDetectorJob job,
Job job,
boolean forMultiEntityDetector,
MultiResponsesDelegateActionListener<DetectorProfile> listener
) {
Expand All @@ -430,7 +434,7 @@ private void profileModels(
private ActionListener<ProfileResponse> onModelResponse(
AnomalyDetector detector,
Set<DetectorProfileName> profilesToCollect,
AnomalyDetectorJob job,
Job job,
MultiResponsesDelegateActionListener<DetectorProfile> listener
) {
boolean isMultientityDetector = detector.isHighCardinality();
Expand Down Expand Up @@ -464,7 +468,7 @@ private ActionListener<ProfileResponse> onModelResponse(
}

private void profileMultiEntityDetectorStateRelated(
AnomalyDetectorJob job,
Job job,
Set<DetectorProfileName> profilesToCollect,
ProfileResponse profileResponse,
DetectorProfile.Builder profileBuilder,
Expand All @@ -478,10 +482,11 @@ private void profileMultiEntityDetectorStateRelated(
long enabledTime = job.getEnabledTime().toEpochMilli();
long totalUpdates = profileResponse.getTotalUpdates();
ProfileUtil
.confirmDetectorRealtimeInitStatus(
.confirmRealtimeInitStatus(
detector,
enabledTime,
client,
AnalysisType.AD,
onInittedEver(enabledTime, profileBuilder, profilesToCollect, detector, totalUpdates, listener)
);
} else {
Expand All @@ -490,7 +495,7 @@ private void profileMultiEntityDetectorStateRelated(
}
} else {
if (profilesToCollect.contains(DetectorProfileName.STATE)) {
profileBuilder.state(DetectorState.DISABLED);
profileBuilder.state(ConfigState.DISABLED);
}
listener.onResponse(profileBuilder.build());
}
Expand Down Expand Up @@ -577,7 +582,7 @@ private ActionListener<RCFPollingResponse> onPollRCFUpdates(

private void createRunningStateAndInitProgress(Set<DetectorProfileName> profilesToCollect, DetectorProfile.Builder builder) {
if (profilesToCollect.contains(DetectorProfileName.STATE)) {
builder.state(DetectorState.RUNNING).build();
builder.state(ConfigState.RUNNING).build();
}

if (profilesToCollect.contains(DetectorProfileName.INIT_PROGRESS)) {
Expand All @@ -595,7 +600,7 @@ private void processInitResponse(
MultiResponsesDelegateActionListener<DetectorProfile> listener
) {
if (profilesToCollect.contains(DetectorProfileName.STATE)) {
builder.state(DetectorState.INIT);
builder.state(ConfigState.INIT);
}

if (profilesToCollect.contains(DetectorProfileName.INIT_PROGRESS)) {
Expand Down
48 changes: 24 additions & 24 deletions src/main/java/org/opensearch/ad/AnomalyDetectorRunner.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,31 +25,31 @@
import org.apache.logging.log4j.Logger;
import org.opensearch.OpenSearchSecurityException;
import org.opensearch.action.ActionListener;
import org.opensearch.ad.constant.CommonValue;
import org.opensearch.ad.feature.FeatureManager;
import org.opensearch.ad.feature.Features;
import org.opensearch.ad.ml.ModelManager;
import org.opensearch.ad.ml.ADModelManager;
import org.opensearch.ad.ml.ThresholdingResult;
import org.opensearch.ad.model.AnomalyDetector;
import org.opensearch.ad.model.AnomalyResult;
import org.opensearch.ad.model.EntityAnomalyResult;
import org.opensearch.ad.util.MultiResponsesDelegateActionListener;
import org.opensearch.common.util.concurrent.ThreadContext;
import org.opensearch.timeseries.constant.CommonValue;
import org.opensearch.timeseries.feature.FeatureManager;
import org.opensearch.timeseries.feature.Features;
import org.opensearch.timeseries.model.Entity;
import org.opensearch.timeseries.model.Feature;
import org.opensearch.timeseries.model.FeatureData;
import org.opensearch.timeseries.util.MultiResponsesDelegateActionListener;

/**
* Runner to trigger an anomaly detector.
*/
public final class AnomalyDetectorRunner {

private final Logger logger = LogManager.getLogger(AnomalyDetectorRunner.class);
private final ModelManager modelManager;
private final ADModelManager modelManager;
private final FeatureManager featureManager;
private final int maxPreviewResults;

public AnomalyDetectorRunner(ModelManager modelManager, FeatureManager featureManager, int maxPreviewResults) {
public AnomalyDetectorRunner(ADModelManager modelManager, FeatureManager featureManager, int maxPreviewResults) {
this.modelManager = modelManager;
this.featureManager = featureManager;
this.maxPreviewResults = maxPreviewResults;
Expand Down Expand Up @@ -168,24 +168,24 @@ private List<AnomalyResult> parsePreviewResult(

AnomalyResult result;
if (results != null && results.size() > i) {
ThresholdingResult thresholdingResult = results.get(i);
List<AnomalyResult> resultsToSave = thresholdingResult
.toIndexableResults(
detector,
Instant.ofEpochMilli(timeRange.getKey()),
Instant.ofEpochMilli(timeRange.getValue()),
null,
null,
featureDatas,
Optional.ofNullable(entity),
CommonValue.NO_SCHEMA_VERSION,
null,
null,
null
anomalyResults
.addAll(
results
.get(i)
.toIndexableResults(
detector,
Instant.ofEpochMilli(timeRange.getKey()),
Instant.ofEpochMilli(timeRange.getValue()),
null,
null,
featureDatas,
Optional.ofNullable(entity),
CommonValue.NO_SCHEMA_VERSION,
null,
null,
null
)
);
for (AnomalyResult r : resultsToSave) {
anomalyResults.add(r);
}
} else {
result = new AnomalyResult(
detector.getId(),
Expand Down
Loading

0 comments on commit 64c066a

Please sign in to comment.