Skip to content

Commit

Permalink
[Extensions] Increasing AD Extension Request timeout to 20 and fixes …
Browse files Browse the repository at this point in the history
…exception handling for Delete/Profile (#916)

* Replacing hardcoded timeout values with AnomalyDetectorSettings.REQUEST_TIMEOUT setting, increased request timeout setting to 20, fixed affected tests

Signed-off-by: Joshua Palis <[email protected]>

* Addressing PR comments, adding TODO to opendistro legacy settings

Signed-off-by: Joshua Palis <[email protected]>

* Fixes Delete/ProfileDetector exception handling to check message rather than type

Signed-off-by: Joshua Palis <[email protected]>

* Addressing PR comments, added new constructors to IndexUtils/CheckpointDao to assume an empty settings object if not passed directly

Signed-off-by: Joshua Palis <[email protected]>

---------

Signed-off-by: Joshua Palis <[email protected]>
  • Loading branch information
joshpalis committed Jun 1, 2023
1 parent 4e97313 commit c20e72d
Show file tree
Hide file tree
Showing 10 changed files with 160 additions and 29 deletions.
12 changes: 10 additions & 2 deletions src/main/java/org/opensearch/ad/AnomalyDetectorExtension.java
Original file line number Diff line number Diff line change
Expand Up @@ -249,7 +249,14 @@ public Collection<Object> createComponents(ExtensionsRunner runner) {

Throttler throttler = new Throttler(getClock());
ClientUtil clientUtil = new ClientUtil(environmentSettings, restClient(), throttler);
IndexUtils indexUtils = new IndexUtils(restClient(), clientUtil, sdkClusterService, indexNameExpressionResolver, javaAsyncClient());
IndexUtils indexUtils = new IndexUtils(
restClient(),
clientUtil,
sdkClusterService,
indexNameExpressionResolver,
javaAsyncClient(),
environmentSettings
);
nodeFilter = new DiscoveryNodeFilterer(sdkClusterService);
AnomalyDetectionIndices anomalyDetectionIndices = new AnomalyDetectionIndices(
sdkRestClient,
Expand Down Expand Up @@ -358,7 +365,8 @@ public PooledObject<LinkedBuffer> wrap(LinkedBuffer obj) {
AnomalyDetectorSettings.MAX_CHECKPOINT_BYTES,
serializeRCFBufferPool,
AnomalyDetectorSettings.SERIALIZATION_BUFFER_BYTES,
1 - AnomalyDetectorSettings.THRESHOLD_MIN_PVALUE
1 - AnomalyDetectorSettings.THRESHOLD_MIN_PVALUE,
environmentSettings
);

Random random = new Random(42);
Expand Down
8 changes: 6 additions & 2 deletions src/main/java/org/opensearch/ad/AnomalyDetectorJobRunner.java
Original file line number Diff line number Diff line change
Expand Up @@ -652,7 +652,9 @@ public void onFailure(Exception exception) {
}

});
Response response = acquireLockResponse.orTimeout(15L, TimeUnit.SECONDS).join();
Response response = acquireLockResponse
.orTimeout(AnomalyDetectorSettings.REQUEST_TIMEOUT.get(settings).getMillis(), TimeUnit.MILLISECONDS)
.join();

log.info("Acquired lock for AD job {}", context.getJobId());

Expand Down Expand Up @@ -685,7 +687,9 @@ public void onFailure(Exception exception) {
}

});
Response response = releaseLockResponse.orTimeout(15L, TimeUnit.SECONDS).join();
Response response = releaseLockResponse
.orTimeout(AnomalyDetectorSettings.REQUEST_TIMEOUT.get(settings).getMillis(), TimeUnit.MILLISECONDS)
.join();

boolean lockIsReleased = RestStatus.fromCode(response.getStatusLine().getStatusCode()) == RestStatus.OK ? true : false;
if (lockIsReleased) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@
import org.opensearch.ad.model.AnomalyDetectorJob;
import org.opensearch.ad.model.AnomalyResult;
import org.opensearch.ad.rest.handler.AnomalyDetectorFunction;
import org.opensearch.ad.settings.AnomalyDetectorSettings;
import org.opensearch.ad.util.DiscoveryNodeFilterer;
import org.opensearch.client.indices.CreateIndexRequest;
import org.opensearch.client.indices.CreateIndexResponse;
Expand Down Expand Up @@ -121,6 +122,7 @@ public class AnomalyDetectionIndices implements LocalNodeMasterListener {
private final SDKRestClient adminClient;
private final OpenSearchAsyncClient sdkJavaAsyncClient;
private final ThreadPool threadPool;
private final Settings environmentSettings;

private volatile TimeValue historyRolloverPeriod;
private volatile Long historyMaxDocs;
Expand Down Expand Up @@ -188,6 +190,7 @@ public AnomalyDetectionIndices(
this.sdkJavaAsyncClient = sdkJavaAsyncClient;
this.sdkClusterService = sdkClusterService;
this.threadPool = threadPool;
this.environmentSettings = settings;
// FIXME Implement this
// https://github.com/opensearch-project/opensearch-sdk-java/issues/423
// this.clusterService.addLocalNodeMasterListener(this);
Expand Down Expand Up @@ -1098,7 +1101,9 @@ private void updateJobIndexSettingIfNecessary(IndexState jobIndexState, ActionLi

GetIndicesSettingsResponse settingResponse;
try {
settingResponse = getIndicesSettingsResponse.orTimeout(10L, TimeUnit.SECONDS).get();
settingResponse = getIndicesSettingsResponse
.orTimeout(AnomalyDetectorSettings.REQUEST_TIMEOUT.get(environmentSettings).getMillis(), TimeUnit.MILLISECONDS)
.get();
// auto expand setting is a range string like "1-all"
org.opensearch.client.opensearch.indices.IndexState indexState = settingResponse.get(ADIndex.JOB.getIndexName());
String autoExpandReplica = indexState.settings().autoExpandReplicas();
Expand Down
69 changes: 67 additions & 2 deletions src/main/java/org/opensearch/ad/ml/CheckpointDao.java
Original file line number Diff line number Diff line change
Expand Up @@ -56,13 +56,15 @@
import org.opensearch.ad.indices.ADIndex;
import org.opensearch.ad.indices.AnomalyDetectionIndices;
import org.opensearch.ad.model.Entity;
import org.opensearch.ad.settings.AnomalyDetectorSettings;
import org.opensearch.ad.util.ClientUtil;
import org.opensearch.client.opensearch.OpenSearchAsyncClient;
import org.opensearch.client.opensearch._types.BulkIndexByScrollFailure;
import org.opensearch.client.opensearch._types.Conflicts;
import org.opensearch.client.opensearch._types.ExpandWildcard;
import org.opensearch.client.opensearch.core.DeleteByQueryRequest;
import org.opensearch.client.opensearch.core.DeleteByQueryResponse;
import org.opensearch.common.settings.Settings;
import org.opensearch.index.IndexNotFoundException;
import org.opensearch.sdk.SDKClient.SDKRestClient;

Expand Down Expand Up @@ -115,6 +117,7 @@ public class CheckpointDao {
// configuration
private final String indexName;

private Settings settings;
private Gson gson;
private RandomCutForestMapper mapper;

Expand Down Expand Up @@ -142,7 +145,7 @@ public class CheckpointDao {
private double anomalyRate;

/**
* Constructor with dependencies and configuration.
* Constructor with dependencies, configuration, empty settings
*
* @param client ES search client
* @param sdkJavaAsyncClient OpenSearch Async Client
Expand Down Expand Up @@ -176,9 +179,68 @@ public CheckpointDao(
GenericObjectPool<LinkedBuffer> serializeRCFBufferPool,
int serializeRCFBufferSize,
double anomalyRate
) {
this(
client,
sdkJavaAsyncClient,
clientUtil,
indexName,
gson,
mapper,
converter,
trcfMapper,
trcfSchema,
thresholdingModelClass,
indexUtil,
maxCheckpointBytes,
serializeRCFBufferPool,
serializeRCFBufferSize,
anomalyRate,
Settings.EMPTY
);
}

/**
* Constructor with dependencies and configuration.
*
* @param client ES search client
* @param sdkJavaAsyncClient OpenSearch Async Client
* @param clientUtil utility with ES client
* @param indexName name of the index for model checkpoints
* @param gson accessor to Gson functionality
* @param mapper RCF model serialization utility
* @param converter converter from rcf v1 serde to protostuff based format
* @param trcfMapper TRCF serialization mapper
* @param trcfSchema TRCF serialization schema
* @param thresholdingModelClass thresholding model's class
* @param indexUtil Index utility methods
* @param maxCheckpointBytes max checkpoint size in bytes
* @param serializeRCFBufferPool object pool for serializing rcf models
* @param serializeRCFBufferSize the size of the buffer for RCF serialization
* @param anomalyRate anomaly rate
* @param settings Environment Settings
*/
public CheckpointDao(
SDKRestClient client,
OpenSearchAsyncClient sdkJavaAsyncClient,
ClientUtil clientUtil,
String indexName,
Gson gson,
RandomCutForestMapper mapper,
V1JsonToV3StateConverter converter,
ThresholdedRandomCutForestMapper trcfMapper,
Schema<ThresholdedRandomCutForestState> trcfSchema,
Class<? extends ThresholdingModel> thresholdingModelClass,
AnomalyDetectionIndices indexUtil,
int maxCheckpointBytes,
GenericObjectPool<LinkedBuffer> serializeRCFBufferPool,
int serializeRCFBufferSize,
double anomalyRate,
Settings settings
) {
this.client = client;
this.sdkJavaAsyncClient = sdkJavaAsyncClient;
this.settings = settings;
this.clientUtil = clientUtil;
this.indexName = indexName;
this.gson = gson;
Expand All @@ -192,6 +254,7 @@ public CheckpointDao(
this.serializeRCFBufferPool = serializeRCFBufferPool;
this.serializeRCFBufferSize = serializeRCFBufferSize;
this.anomalyRate = anomalyRate;
this.settings = settings;
}

private void saveModelCheckpointSync(Map<String, Object> source, String modelId) {
Expand Down Expand Up @@ -452,7 +515,9 @@ public void deleteModelCheckpointByDetectorId(String detectorID) {
logger.info("Delete checkpoints of detector {}", detectorID);
try {
CompletableFuture<DeleteByQueryResponse> deleteResponse = sdkJavaAsyncClient.deleteByQuery(deleteRequest);
DeleteByQueryResponse response = deleteResponse.orTimeout(10L, TimeUnit.SECONDS).get();
DeleteByQueryResponse response = deleteResponse
.orTimeout(AnomalyDetectorSettings.REQUEST_TIMEOUT.get(settings).getMillis(), TimeUnit.MILLISECONDS)
.get();
if (response.timedOut() || !response.failures().isEmpty()) {
logFailure(response, detectorID);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -215,7 +215,9 @@ public void onFailure(Exception exception) {

});

Response response = registerJobDetailsResponse.orTimeout(15, TimeUnit.SECONDS).join();
Response response = registerJobDetailsResponse
.orTimeout(AnomalyDetectorSettings.REQUEST_TIMEOUT.get(settings).getMillis(), TimeUnit.MILLISECONDS)
.join();
this.registeredJobDetails = RestStatus.fromCode(response.getStatusLine().getStatusCode()) == RestStatus.OK ? true : false;
LOG.info("Job Details Registered : " + registeredJobDetails);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,10 +54,11 @@ private LegacyOpenDistroAnomalyDetectorSettings() {}
Setting.Property.Deprecated
);

// TODO : Revert setting value back to 10 seconds once there is support for more targeted cluster state requests
public static final Setting<TimeValue> REQUEST_TIMEOUT = Setting
.positiveTimeSetting(
"opendistro.anomaly_detection.request_timeout",
TimeValue.timeValueSeconds(10),
TimeValue.timeValueSeconds(20),
Setting.Property.NodeScope,
Setting.Property.Dynamic,
Setting.Property.Deprecated
Expand Down
46 changes: 34 additions & 12 deletions src/main/java/org/opensearch/ad/task/ADTaskManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,7 @@
import org.opensearch.ad.model.Entity;
import org.opensearch.ad.rest.handler.AnomalyDetectorFunction;
import org.opensearch.ad.rest.handler.IndexAnomalyDetectorJobActionHandler;
import org.opensearch.ad.settings.AnomalyDetectorSettings;
import org.opensearch.ad.transport.ADBatchAnomalyResultAction;
import org.opensearch.ad.transport.ADBatchAnomalyResultRequest;
import org.opensearch.ad.transport.ADCancelTaskAction;
Expand Down Expand Up @@ -212,6 +213,8 @@ public class ADTaskManager {
private static int DEFAULT_MAINTAIN_INTERVAL_IN_SECONDS = 5;
private final Semaphore checkingTaskSlot;

private final Settings settings;

private volatile Integer maxAdBatchTaskPerNode;
private volatile Integer maxRunningEntitiesPerDetector;

Expand All @@ -238,6 +241,7 @@ public ADTaskManager(
this.nodeFilter = nodeFilter;
this.sdkClusterService = sdkClusterService;
this.adTaskCacheManager = adTaskCacheManager;
this.settings = settings;
/* MultiNode support https://github.com/opensearch-project/opensearch-sdk-java/issues/200 */
// this.hashRing = hashRing;

Expand Down Expand Up @@ -1377,7 +1381,9 @@ private void resetEntityTasksAsStopped(String detectorTaskId) {

try {
CompletableFuture<UpdateByQueryResponse> updateByQueryResponse = sdkJavaAsyncClient.updateByQuery(updateByQueryRequest.build());
UpdateByQueryResponse queryResponse = updateByQueryResponse.orTimeout(10L, TimeUnit.SECONDS).get();
UpdateByQueryResponse queryResponse = updateByQueryResponse
.orTimeout(AnomalyDetectorSettings.REQUEST_TIMEOUT.get(settings).getMillis(), TimeUnit.MILLISECONDS)
.get();
List<BulkIndexByScrollFailure> bulkFailures = queryResponse.failures();
if (isNullOrEmpty(bulkFailures)) {
logger.debug("Updated {} child entity tasks state for detector task {}", queryResponse.updated(), detectorTaskId);
Expand Down Expand Up @@ -1573,7 +1579,9 @@ private void updateLatestFlagOfOldTasksAndCreateNewTask(

try {
CompletableFuture<UpdateByQueryResponse> updateByQueryResponse = sdkJavaAsyncClient.updateByQuery(updateByQueryRequest.build());
UpdateByQueryResponse queryResponse = updateByQueryResponse.orTimeout(10L, TimeUnit.SECONDS).get();
UpdateByQueryResponse queryResponse = updateByQueryResponse
.orTimeout(AnomalyDetectorSettings.REQUEST_TIMEOUT.get(settings).getMillis(), TimeUnit.MILLISECONDS)
.get();

List<BulkIndexByScrollFailure> bulkFailures = queryResponse.failures();
if (isNullOrEmpty(bulkFailures)) {
Expand Down Expand Up @@ -1776,7 +1784,9 @@ protected <T> void deleteTaskDocs(
BulkRequest bulkRequest = new BulkRequest.Builder().operations(operations).build();
try {
CompletableFuture<BulkResponse> bulkResponse = sdkJavaAsyncClient.bulk(bulkRequest);
BulkResponse res = bulkResponse.orTimeout(10L, TimeUnit.SECONDS).get();
BulkResponse res = bulkResponse
.orTimeout(AnomalyDetectorSettings.REQUEST_TIMEOUT.get(settings).getMillis(), TimeUnit.MILLISECONDS)
.get();

logger.info("Old AD tasks deleted for detector {}", detectorId);
List<BulkResponseItem> bulkItemResponses = res.items();
Expand Down Expand Up @@ -1833,7 +1843,9 @@ public void cleanChildTasksAndADResultsOfDeletedTask() {
.build();
try {
CompletableFuture<DeleteByQueryResponse> deleteADResultsResponse = sdkJavaAsyncClient.deleteByQuery(deleteADResultsRequest);
DeleteByQueryResponse res = deleteADResultsResponse.orTimeout(10L, TimeUnit.SECONDS).get();
DeleteByQueryResponse res = deleteADResultsResponse
.orTimeout(AnomalyDetectorSettings.REQUEST_TIMEOUT.get(settings).getMillis(), TimeUnit.MILLISECONDS)
.get();

logger.debug("Successfully deleted AD results of task " + taskId);
DeleteByQueryRequest deleteChildTasksRequest = new DeleteByQueryRequest.Builder()
Expand All @@ -1845,7 +1857,9 @@ public void cleanChildTasksAndADResultsOfDeletedTask() {
try {
CompletableFuture<DeleteByQueryResponse> deleteChildTasksResponse = sdkJavaAsyncClient
.deleteByQuery(deleteChildTasksRequest);
DeleteByQueryResponse deleteByQueryResponse = deleteChildTasksResponse.orTimeout(10L, TimeUnit.SECONDS).get();
DeleteByQueryResponse deleteByQueryResponse = deleteChildTasksResponse
.orTimeout(AnomalyDetectorSettings.REQUEST_TIMEOUT.get(settings).getMillis(), TimeUnit.MILLISECONDS)
.get();

logger.debug("Successfully deleted child tasks of task " + taskId);
cleanChildTasksAndADResultsOfDeletedTask();
Expand Down Expand Up @@ -2014,7 +2028,9 @@ public void deleteADTasks(String detectorId, AnomalyDetectorFunction function, A
request.query(q -> q.bool(query.build()));
try {
CompletableFuture<DeleteByQueryResponse> deleteByQueryResponse = sdkJavaAsyncClient.deleteByQuery(request.build());
DeleteByQueryResponse response = deleteByQueryResponse.orTimeout(10L, TimeUnit.SECONDS).get();
DeleteByQueryResponse response = deleteByQueryResponse
.orTimeout(AnomalyDetectorSettings.REQUEST_TIMEOUT.get(settings).getMillis(), TimeUnit.MILLISECONDS)
.get();

if (response.failures() == null || response.failures().size() == 0) {
logger.info("AD tasks deleted for detector {}", detectorId);
Expand All @@ -2023,11 +2039,13 @@ public void deleteADTasks(String detectorId, AnomalyDetectorFunction function, A
} else {
listener.onFailure(new OpenSearchStatusException("Failed to delete all AD tasks", RestStatus.INTERNAL_SERVER_ERROR));
}
} catch (IndexNotFoundException e) {
deleteADResultOfDetector(detectorId);
function.execute();
} catch (Exception e) {
listener.onFailure(e);
if (e.getMessage().contains("index_not_found_exception")) {
deleteADResultOfDetector(detectorId);
function.execute();
} else {
listener.onFailure(e);
}
}
}

Expand All @@ -2045,7 +2063,9 @@ private void deleteADResultOfDetector(String detectorId) {
try {
CompletableFuture<DeleteByQueryResponse> deleteADResultsResponse = sdkJavaAsyncClient
.deleteByQuery(deleteADResultsRequest.build());
DeleteByQueryResponse deleteByQueryResponse = deleteADResultsResponse.orTimeout(10L, TimeUnit.SECONDS).get();
DeleteByQueryResponse deleteByQueryResponse = deleteADResultsResponse
.orTimeout(AnomalyDetectorSettings.REQUEST_TIMEOUT.get(settings).getMillis(), TimeUnit.MILLISECONDS)
.get();
logger.debug("Successfully deleted AD results of detector " + detectorId);
} catch (Exception exception) {
logger.error("Failed to delete AD results of detector " + detectorId, exception);
Expand Down Expand Up @@ -3037,7 +3057,9 @@ public void resetLatestFlagAsFalse(List<ADTask> adTasks) {

try {
CompletableFuture<BulkResponse> bulkResponse = sdkJavaAsyncClient.bulk(bulkRequest);
BulkResponse res = bulkResponse.orTimeout(10L, TimeUnit.SECONDS).get();
BulkResponse res = bulkResponse
.orTimeout(AnomalyDetectorSettings.REQUEST_TIMEOUT.get(settings).getMillis(), TimeUnit.MILLISECONDS)
.get();

List<BulkResponseItem> bulkItemResponses = res.items();
if (bulkItemResponses != null && bulkItemResponses.size() > 0) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,6 @@
import org.opensearch.ad.util.RestHandlerUtils;
import org.opensearch.common.settings.Settings;
import org.opensearch.core.xcontent.XContentParser;
import org.opensearch.index.IndexNotFoundException;
import org.opensearch.rest.RestStatus;
import org.opensearch.sdk.ExtensionsRunner;
import org.opensearch.sdk.SDKClient.SDKRestClient;
Expand Down Expand Up @@ -147,7 +146,7 @@ private void deleteAnomalyDetectorJobDoc(String detectorId, ActionListener<Delet
}
}, exception -> {
LOG.error("Failed to delete AD job for " + detectorId, exception);
if (exception instanceof IndexNotFoundException) {
if (exception.getMessage().contains("index_not_found_exception")) {
deleteDetectorStateDoc(detectorId, listener);
} else {
LOG.error("Failed to delete anomaly detector job", exception);
Expand All @@ -163,7 +162,7 @@ private void deleteDetectorStateDoc(String detectorId, ActionListener<DeleteResp
// whether deleted state doc or not, continue as state doc may not exist
deleteAnomalyDetectorDoc(detectorId, listener);
}, exception -> {
if (exception instanceof IndexNotFoundException) {
if (exception.getMessage().contains("index_not_found_exception")) {
deleteAnomalyDetectorDoc(detectorId, listener);
} else {
LOG.error("Failed to delete detector state", exception);
Expand Down
Loading

0 comments on commit c20e72d

Please sign in to comment.