Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Extensions] Increasing AD Extension Request timeout to 20 and fixes exception handling for Delete/Profile #916

Merged
merged 4 commits into from
Jun 1, 2023
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -249,7 +249,14 @@ public Collection<Object> createComponents(ExtensionsRunner runner) {

Throttler throttler = new Throttler(getClock());
ClientUtil clientUtil = new ClientUtil(environmentSettings, restClient(), throttler);
IndexUtils indexUtils = new IndexUtils(restClient(), clientUtil, sdkClusterService, indexNameExpressionResolver, javaAsyncClient());
IndexUtils indexUtils = new IndexUtils(
restClient(),
clientUtil,
sdkClusterService,
indexNameExpressionResolver,
javaAsyncClient(),
environmentSettings
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Similar comment here; we do have the settings at the end but can we just add an overloaded constructor that assumes empty settings if not passed so we don't have to rewrite existing constructor calls?

);
nodeFilter = new DiscoveryNodeFilterer(sdkClusterService);
AnomalyDetectionIndices anomalyDetectionIndices = new AnomalyDetectionIndices(
sdkRestClient,
Expand Down Expand Up @@ -342,6 +349,7 @@ public PooledObject<LinkedBuffer> wrap(LinkedBuffer obj) {
CheckpointDao checkpoint = new CheckpointDao(
sdkRestClient,
sdkJavaAsyncClient,
environmentSettings,
clientUtil,
CommonName.CHECKPOINT_INDEX_NAME,
gson,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -652,7 +652,9 @@ public void onFailure(Exception exception) {
}

});
Response response = acquireLockResponse.orTimeout(15L, TimeUnit.SECONDS).join();
Response response = acquireLockResponse
.orTimeout(AnomalyDetectorSettings.REQUEST_TIMEOUT.get(settings).getMillis(), TimeUnit.MILLISECONDS)
.join();

log.info("Acquired lock for AD job {}", context.getJobId());

Expand Down Expand Up @@ -685,7 +687,9 @@ public void onFailure(Exception exception) {
}

});
Response response = releaseLockResponse.orTimeout(15L, TimeUnit.SECONDS).join();
Response response = releaseLockResponse
.orTimeout(AnomalyDetectorSettings.REQUEST_TIMEOUT.get(settings).getMillis(), TimeUnit.MILLISECONDS)
.join();

boolean lockIsReleased = RestStatus.fromCode(response.getStatusLine().getStatusCode()) == RestStatus.OK ? true : false;
if (lockIsReleased) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@
import org.opensearch.ad.model.AnomalyDetectorJob;
import org.opensearch.ad.model.AnomalyResult;
import org.opensearch.ad.rest.handler.AnomalyDetectorFunction;
import org.opensearch.ad.settings.AnomalyDetectorSettings;
import org.opensearch.ad.util.DiscoveryNodeFilterer;
import org.opensearch.client.indices.CreateIndexRequest;
import org.opensearch.client.indices.CreateIndexResponse;
Expand Down Expand Up @@ -121,6 +122,7 @@ public class AnomalyDetectionIndices implements LocalNodeMasterListener {
private final SDKRestClient adminClient;
private final OpenSearchAsyncClient sdkJavaAsyncClient;
private final ThreadPool threadPool;
private final Settings environmentSettings;

private volatile TimeValue historyRolloverPeriod;
private volatile Long historyMaxDocs;
Expand Down Expand Up @@ -188,6 +190,7 @@ public AnomalyDetectionIndices(
this.sdkJavaAsyncClient = sdkJavaAsyncClient;
this.sdkClusterService = sdkClusterService;
this.threadPool = threadPool;
this.environmentSettings = settings;
// FIXME Implement this
// https://github.com/opensearch-project/opensearch-sdk-java/issues/423
// this.clusterService.addLocalNodeMasterListener(this);
Expand Down Expand Up @@ -1098,7 +1101,9 @@ private void updateJobIndexSettingIfNecessary(IndexState jobIndexState, ActionLi

GetIndicesSettingsResponse settingResponse;
try {
settingResponse = getIndicesSettingsResponse.orTimeout(10L, TimeUnit.SECONDS).get();
settingResponse = getIndicesSettingsResponse
.orTimeout(AnomalyDetectorSettings.REQUEST_TIMEOUT.get(environmentSettings).getMillis(), TimeUnit.MILLISECONDS)
.get();
// auto expand setting is a range string like "1-all"
org.opensearch.client.opensearch.indices.IndexState indexState = settingResponse.get(ADIndex.JOB.getIndexName());
String autoExpandReplica = indexState.settings().autoExpandReplicas();
Expand Down
10 changes: 9 additions & 1 deletion src/main/java/org/opensearch/ad/ml/CheckpointDao.java
Original file line number Diff line number Diff line change
Expand Up @@ -56,13 +56,15 @@
import org.opensearch.ad.indices.ADIndex;
import org.opensearch.ad.indices.AnomalyDetectionIndices;
import org.opensearch.ad.model.Entity;
import org.opensearch.ad.settings.AnomalyDetectorSettings;
import org.opensearch.ad.util.ClientUtil;
import org.opensearch.client.opensearch.OpenSearchAsyncClient;
import org.opensearch.client.opensearch._types.BulkIndexByScrollFailure;
import org.opensearch.client.opensearch._types.Conflicts;
import org.opensearch.client.opensearch._types.ExpandWildcard;
import org.opensearch.client.opensearch.core.DeleteByQueryRequest;
import org.opensearch.client.opensearch.core.DeleteByQueryResponse;
import org.opensearch.common.settings.Settings;
import org.opensearch.index.IndexNotFoundException;
import org.opensearch.sdk.SDKClient.SDKRestClient;

Expand Down Expand Up @@ -114,6 +116,7 @@ public class CheckpointDao {

// configuration
private final String indexName;
private final Settings settings;

private Gson gson;
private RandomCutForestMapper mapper;
Expand Down Expand Up @@ -146,6 +149,7 @@ public class CheckpointDao {
*
* @param client ES search client
* @param sdkJavaAsyncClient OpenSearch Async Client
* @param settings Environment Settings
* @param clientUtil utility with ES client
* @param indexName name of the index for model checkpoints
* @param gson accessor to Gson functionality
Expand All @@ -163,6 +167,7 @@ public class CheckpointDao {
public CheckpointDao(
SDKRestClient client,
OpenSearchAsyncClient sdkJavaAsyncClient,
Settings settings,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm really iffy on adding a new parameter to an already-long list, but if we do so it should probably be at the end of the list, not in the middle. Also we could add an overload that delegates from the previous version to the new version with Settings.EMPTY to avoid having to change every single existing call.

ClientUtil clientUtil,
String indexName,
Gson gson,
Expand All @@ -179,6 +184,7 @@ public CheckpointDao(
) {
this.client = client;
this.sdkJavaAsyncClient = sdkJavaAsyncClient;
this.settings = settings;
this.clientUtil = clientUtil;
this.indexName = indexName;
this.gson = gson;
Expand Down Expand Up @@ -452,7 +458,9 @@ public void deleteModelCheckpointByDetectorId(String detectorID) {
logger.info("Delete checkpoints of detector {}", detectorID);
try {
CompletableFuture<DeleteByQueryResponse> deleteResponse = sdkJavaAsyncClient.deleteByQuery(deleteRequest);
DeleteByQueryResponse response = deleteResponse.orTimeout(10L, TimeUnit.SECONDS).get();
DeleteByQueryResponse response = deleteResponse
.orTimeout(AnomalyDetectorSettings.REQUEST_TIMEOUT.get(settings).getMillis(), TimeUnit.MILLISECONDS)
.get();
if (response.timedOut() || !response.failures().isEmpty()) {
logFailure(response, detectorID);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -215,7 +215,9 @@ public void onFailure(Exception exception) {

});

Response response = registerJobDetailsResponse.orTimeout(15, TimeUnit.SECONDS).join();
Response response = registerJobDetailsResponse
.orTimeout(AnomalyDetectorSettings.REQUEST_TIMEOUT.get(settings).getMillis(), TimeUnit.MILLISECONDS)
.join();
this.registeredJobDetails = RestStatus.fromCode(response.getStatusLine().getStatusCode()) == RestStatus.OK ? true : false;
LOG.info("Job Details Registered : " + registeredJobDetails);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,10 +54,11 @@ private LegacyOpenDistroAnomalyDetectorSettings() {}
Setting.Property.Deprecated
);

// TODO : Revert setting value back to 10 seconds once there is support for more targeted cluster state requests
public static final Setting<TimeValue> REQUEST_TIMEOUT = Setting
.positiveTimeSetting(
"opendistro.anomaly_detection.request_timeout",
TimeValue.timeValueSeconds(10),
TimeValue.timeValueSeconds(20),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you add a TODO here to revert this back once we have support for cluster state? That's one of major reason for the timeout

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure

Setting.Property.NodeScope,
Setting.Property.Dynamic,
Setting.Property.Deprecated
Expand Down
46 changes: 34 additions & 12 deletions src/main/java/org/opensearch/ad/task/ADTaskManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,7 @@
import org.opensearch.ad.model.Entity;
import org.opensearch.ad.rest.handler.AnomalyDetectorFunction;
import org.opensearch.ad.rest.handler.IndexAnomalyDetectorJobActionHandler;
import org.opensearch.ad.settings.AnomalyDetectorSettings;
import org.opensearch.ad.transport.ADBatchAnomalyResultAction;
import org.opensearch.ad.transport.ADBatchAnomalyResultRequest;
import org.opensearch.ad.transport.ADCancelTaskAction;
Expand Down Expand Up @@ -212,6 +213,8 @@ public class ADTaskManager {
private static int DEFAULT_MAINTAIN_INTERVAL_IN_SECONDS = 5;
private final Semaphore checkingTaskSlot;

private final Settings settings;

private volatile Integer maxAdBatchTaskPerNode;
private volatile Integer maxRunningEntitiesPerDetector;

Expand All @@ -238,6 +241,7 @@ public ADTaskManager(
this.nodeFilter = nodeFilter;
this.sdkClusterService = sdkClusterService;
this.adTaskCacheManager = adTaskCacheManager;
this.settings = settings;
/* MultiNode support https://github.com/opensearch-project/opensearch-sdk-java/issues/200 */
// this.hashRing = hashRing;

Expand Down Expand Up @@ -1377,7 +1381,9 @@ private void resetEntityTasksAsStopped(String detectorTaskId) {

try {
CompletableFuture<UpdateByQueryResponse> updateByQueryResponse = sdkJavaAsyncClient.updateByQuery(updateByQueryRequest.build());
UpdateByQueryResponse queryResponse = updateByQueryResponse.orTimeout(10L, TimeUnit.SECONDS).get();
UpdateByQueryResponse queryResponse = updateByQueryResponse
.orTimeout(AnomalyDetectorSettings.REQUEST_TIMEOUT.get(settings).getMillis(), TimeUnit.MILLISECONDS)
.get();
List<BulkIndexByScrollFailure> bulkFailures = queryResponse.failures();
if (isNullOrEmpty(bulkFailures)) {
logger.debug("Updated {} child entity tasks state for detector task {}", queryResponse.updated(), detectorTaskId);
Expand Down Expand Up @@ -1573,7 +1579,9 @@ private void updateLatestFlagOfOldTasksAndCreateNewTask(

try {
CompletableFuture<UpdateByQueryResponse> updateByQueryResponse = sdkJavaAsyncClient.updateByQuery(updateByQueryRequest.build());
UpdateByQueryResponse queryResponse = updateByQueryResponse.orTimeout(10L, TimeUnit.SECONDS).get();
UpdateByQueryResponse queryResponse = updateByQueryResponse
.orTimeout(AnomalyDetectorSettings.REQUEST_TIMEOUT.get(settings).getMillis(), TimeUnit.MILLISECONDS)
.get();

List<BulkIndexByScrollFailure> bulkFailures = queryResponse.failures();
if (isNullOrEmpty(bulkFailures)) {
Expand Down Expand Up @@ -1776,7 +1784,9 @@ protected <T> void deleteTaskDocs(
BulkRequest bulkRequest = new BulkRequest.Builder().operations(operations).build();
try {
CompletableFuture<BulkResponse> bulkResponse = sdkJavaAsyncClient.bulk(bulkRequest);
BulkResponse res = bulkResponse.orTimeout(10L, TimeUnit.SECONDS).get();
BulkResponse res = bulkResponse
.orTimeout(AnomalyDetectorSettings.REQUEST_TIMEOUT.get(settings).getMillis(), TimeUnit.MILLISECONDS)
.get();

logger.info("Old AD tasks deleted for detector {}", detectorId);
List<BulkResponseItem> bulkItemResponses = res.items();
Expand Down Expand Up @@ -1833,7 +1843,9 @@ public void cleanChildTasksAndADResultsOfDeletedTask() {
.build();
try {
CompletableFuture<DeleteByQueryResponse> deleteADResultsResponse = sdkJavaAsyncClient.deleteByQuery(deleteADResultsRequest);
DeleteByQueryResponse res = deleteADResultsResponse.orTimeout(10L, TimeUnit.SECONDS).get();
DeleteByQueryResponse res = deleteADResultsResponse
.orTimeout(AnomalyDetectorSettings.REQUEST_TIMEOUT.get(settings).getMillis(), TimeUnit.MILLISECONDS)
.get();

logger.debug("Successfully deleted AD results of task " + taskId);
DeleteByQueryRequest deleteChildTasksRequest = new DeleteByQueryRequest.Builder()
Expand All @@ -1845,7 +1857,9 @@ public void cleanChildTasksAndADResultsOfDeletedTask() {
try {
CompletableFuture<DeleteByQueryResponse> deleteChildTasksResponse = sdkJavaAsyncClient
.deleteByQuery(deleteChildTasksRequest);
DeleteByQueryResponse deleteByQueryResponse = deleteChildTasksResponse.orTimeout(10L, TimeUnit.SECONDS).get();
DeleteByQueryResponse deleteByQueryResponse = deleteChildTasksResponse
.orTimeout(AnomalyDetectorSettings.REQUEST_TIMEOUT.get(settings).getMillis(), TimeUnit.MILLISECONDS)
.get();

logger.debug("Successfully deleted child tasks of task " + taskId);
cleanChildTasksAndADResultsOfDeletedTask();
Expand Down Expand Up @@ -2014,7 +2028,9 @@ public void deleteADTasks(String detectorId, AnomalyDetectorFunction function, A
request.query(q -> q.bool(query.build()));
try {
CompletableFuture<DeleteByQueryResponse> deleteByQueryResponse = sdkJavaAsyncClient.deleteByQuery(request.build());
DeleteByQueryResponse response = deleteByQueryResponse.orTimeout(10L, TimeUnit.SECONDS).get();
DeleteByQueryResponse response = deleteByQueryResponse
.orTimeout(AnomalyDetectorSettings.REQUEST_TIMEOUT.get(settings).getMillis(), TimeUnit.MILLISECONDS)
.get();

if (response.failures() == null || response.failures().size() == 0) {
logger.info("AD tasks deleted for detector {}", detectorId);
Expand All @@ -2023,11 +2039,13 @@ public void deleteADTasks(String detectorId, AnomalyDetectorFunction function, A
} else {
listener.onFailure(new OpenSearchStatusException("Failed to delete all AD tasks", RestStatus.INTERNAL_SERVER_ERROR));
}
} catch (IndexNotFoundException e) {
deleteADResultOfDetector(detectorId);
function.execute();
} catch (Exception e) {
listener.onFailure(e);
if (e.getMessage().contains("index_not_found_exception")) {
deleteADResultOfDetector(detectorId);
function.execute();
} else {
listener.onFailure(e);
}
}
}

Expand All @@ -2045,7 +2063,9 @@ private void deleteADResultOfDetector(String detectorId) {
try {
CompletableFuture<DeleteByQueryResponse> deleteADResultsResponse = sdkJavaAsyncClient
.deleteByQuery(deleteADResultsRequest.build());
DeleteByQueryResponse deleteByQueryResponse = deleteADResultsResponse.orTimeout(10L, TimeUnit.SECONDS).get();
DeleteByQueryResponse deleteByQueryResponse = deleteADResultsResponse
.orTimeout(AnomalyDetectorSettings.REQUEST_TIMEOUT.get(settings).getMillis(), TimeUnit.MILLISECONDS)
.get();
logger.debug("Successfully deleted AD results of detector " + detectorId);
} catch (Exception exception) {
logger.error("Failed to delete AD results of detector " + detectorId, exception);
Expand Down Expand Up @@ -3037,7 +3057,9 @@ public void resetLatestFlagAsFalse(List<ADTask> adTasks) {

try {
CompletableFuture<BulkResponse> bulkResponse = sdkJavaAsyncClient.bulk(bulkRequest);
BulkResponse res = bulkResponse.orTimeout(10L, TimeUnit.SECONDS).get();
BulkResponse res = bulkResponse
.orTimeout(AnomalyDetectorSettings.REQUEST_TIMEOUT.get(settings).getMillis(), TimeUnit.MILLISECONDS)
.get();

List<BulkResponseItem> bulkItemResponses = res.items();
if (bulkItemResponses != null && bulkItemResponses.size() > 0) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,6 @@
import org.opensearch.ad.util.RestHandlerUtils;
import org.opensearch.common.settings.Settings;
import org.opensearch.core.xcontent.XContentParser;
import org.opensearch.index.IndexNotFoundException;
import org.opensearch.rest.RestStatus;
import org.opensearch.sdk.ExtensionsRunner;
import org.opensearch.sdk.SDKClient.SDKRestClient;
Expand Down Expand Up @@ -147,7 +146,7 @@ private void deleteAnomalyDetectorJobDoc(String detectorId, ActionListener<Delet
}
}, exception -> {
LOG.error("Failed to delete AD job for " + detectorId, exception);
if (exception instanceof IndexNotFoundException) {
if (exception.getMessage().contains("index_not_found_exception")) {
deleteDetectorStateDoc(detectorId, listener);
} else {
LOG.error("Failed to delete anomaly detector job", exception);
Expand All @@ -163,7 +162,7 @@ private void deleteDetectorStateDoc(String detectorId, ActionListener<DeleteResp
// whether deleted state doc or not, continue as state doc may not exist
deleteAnomalyDetectorDoc(detectorId, listener);
}, exception -> {
if (exception instanceof IndexNotFoundException) {
if (exception.getMessage().contains("index_not_found_exception")) {
deleteAnomalyDetectorDoc(detectorId, listener);
} else {
LOG.error("Failed to delete detector state", exception);
Expand Down
4 changes: 1 addition & 3 deletions src/main/java/org/opensearch/ad/util/ExceptionUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
import org.opensearch.ExceptionsHelper;
import org.opensearch.OpenSearchException;
import org.opensearch.action.ActionListener;
import org.opensearch.action.NoShardAvailableActionException;
import org.opensearch.action.UnavailableShardsException;
import org.opensearch.action.index.IndexResponse;
import org.opensearch.action.support.replication.ReplicationResponse;
Expand All @@ -28,7 +27,6 @@
import org.opensearch.ad.common.exception.LimitExceededException;
import org.opensearch.common.io.stream.NotSerializableExceptionWrapper;
import org.opensearch.core.concurrency.OpenSearchRejectedExecutionException;
import org.opensearch.index.IndexNotFoundException;
import org.opensearch.rest.RestStatus;

public class ExceptionUtil {
Expand Down Expand Up @@ -186,6 +184,6 @@ public static boolean isIndexNotAvailable(Exception e) {
if (e == null) {
return false;
}
return e instanceof IndexNotFoundException || e instanceof NoShardAvailableActionException;
return (e.getMessage().contains("index_not_found_exception")) || (e.getMessage().contains("no_shard_available_action_exception"));
}
}
Loading