Skip to content

Commit

Permalink
[Backport 2.x] Block delete model requests if an index uses the model (
Browse files Browse the repository at this point in the history
…#1745)

* Block delete model requests if an index uses the model (#1722)

* Block delete model requests if an index uses the model

Signed-off-by: Ryan Bogan <[email protected]>

* Add test

Signed-off-by: Ryan Bogan <[email protected]>

* Add changelog entry

Signed-off-by: Ryan Bogan <[email protected]>

* Change tests that were deleting a model in use

Signed-off-by: Ryan Bogan <[email protected]>

* Debug

Signed-off-by: Ryan Bogan <[email protected]>

* Add extra check to fix incorrect exception

Signed-off-by: Ryan Bogan <[email protected]>

* Remove println

Signed-off-by: Ryan Bogan <[email protected]>

* Change model_id search from contains to parsing response map

Signed-off-by: Ryan Bogan <[email protected]>

* Remove * import

Signed-off-by: Ryan Bogan <[email protected]>

* Combine delete model exceptions

Signed-off-by: Ryan Bogan <[email protected]>

* Move check to UpdateModelGraveyardTransportAction

Signed-off-by: Ryan Bogan <[email protected]>

* Optimize code logic

Signed-off-by: Ryan Bogan <[email protected]>

* Modify method names and throw exception in outer method

Signed-off-by: Ryan Bogan <[email protected]>

* Add test with cases for UpdateModelGraveyardTransportAction

Signed-off-by: Ryan Bogan <[email protected]>

* Fix spotless

Signed-off-by: Ryan Bogan <[email protected]>

* Refactor code and other minor optimizations

Signed-off-by: Ryan Bogan <[email protected]>

---------

Signed-off-by: Ryan Bogan <[email protected]>
(cherry picked from commit 1c4a131)

* Fix compile

Signed-off-by: Ryan Bogan <[email protected]>

---------

Signed-off-by: Ryan Bogan <[email protected]>
Co-authored-by: Ryan Bogan <[email protected]>
  • Loading branch information
opensearch-trigger-bot[bot] and ryanbogan committed Jun 11, 2024
1 parent cd311af commit 0738439
Show file tree
Hide file tree
Showing 11 changed files with 436 additions and 58 deletions.
1 change: 1 addition & 0 deletions release-notes/opensearch-knn.release-notes-2.15.0.0.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ Compatible with OpenSearch 2.15.0
* Add stats for radial search [#1684](https://github.com/opensearch-project/k-NN/pull/1684)
* Support script score when doc value is disabled and fix misusing DISI [#1696](https://github.com/opensearch-project/k-NN/pull/1696)
* Add validation for pq m parameter before training starts [#1713](https://github.com/opensearch-project/k-NN/pull/1713)
* Block delete model requests if an index uses the model [#1722](https://github.com/opensearch-project/k-NN/pull/1722)
### Bug Fixes
* Block commas in model description [#1692](https://github.com/opensearch-project/k-NN/pull/1692)
* Update threshold value after new result is added [#1715](https://github.com/opensearch-project/k-NN/pull/1715)
1 change: 1 addition & 0 deletions src/main/java/org/opensearch/knn/common/KNNConstants.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ public class KNNConstants {
public static final String VECTOR = "vector";
public static final String K = "k";
public static final String TYPE_KNN_VECTOR = "knn_vector";
public static final String PROPERTIES = "properties";
public static final String METHOD_PARAMETER_EF_SEARCH = "ef_search";
public static final String METHOD_PARAMETER_EF_CONSTRUCTION = "ef_construction";
public static final String METHOD_PARAMETER_M = "m";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,18 +10,18 @@
import org.opensearch.core.rest.RestStatus;

/**
* Exception thrown when a model is deleted while it is in the training state. The RestStatus associated with this
* Exception thrown when a model is deleted while it is in the training state or in use by an index. The RestStatus associated with this
* exception should be a {@link RestStatus#CONFLICT} because the request cannot be deleted due to the model being in
* the training state.
* the training state or in use by an index.
*/
public class DeleteModelWhenInTrainStateException extends OpenSearchException {
public class DeleteModelException extends OpenSearchException {
/**
* Constructor
*
* @param msg detailed exception message
* @param args arguments of the message
*/
public DeleteModelWhenInTrainStateException(String msg, Object... args) {
public DeleteModelException(String msg, Object... args) {
super(LoggerMessageFormat.format(msg, args));
}

Expand Down
6 changes: 3 additions & 3 deletions src/main/java/org/opensearch/knn/indices/ModelDao.java
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@
import org.opensearch.core.xcontent.XContentBuilder;
import org.opensearch.index.IndexNotFoundException;
import org.opensearch.knn.common.KNNConstants;
import org.opensearch.knn.common.exception.DeleteModelWhenInTrainStateException;
import org.opensearch.knn.common.exception.DeleteModelException;
import org.opensearch.knn.index.MethodComponentContext;
import org.opensearch.knn.plugin.transport.DeleteModelResponse;
import org.opensearch.knn.plugin.transport.GetModelResponse;
Expand Down Expand Up @@ -84,7 +84,7 @@
public interface ModelDao {

/**
* Creates model index. It is possible that the 2 threads call this function simulateously. In this case, one
* Creates model index. It is possible that the 2 threads call this function simultaneously. In this case, one
* thread will throw a ResourceAlreadyExistsException. This should be caught and handled.
*
* @param actionListener CreateIndexResponse listener
Expand Down Expand Up @@ -527,7 +527,7 @@ public void delete(String modelId, ActionListener<DeleteModelResponse> listener)
// If model is in Training state, fail delete model request
if (ModelState.TRAINING == getModelResponse.getModel().getModelMetadata().getState()) {
String errorMessage = String.format("Cannot delete model [%s]. Model is still in training", modelId);
listener.onFailure(new DeleteModelWhenInTrainStateException(errorMessage));
listener.onFailure(new DeleteModelException(errorMessage));
return;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

import lombok.Value;
import lombok.extern.log4j.Log4j2;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.core.action.ActionListener;
import org.opensearch.action.support.ActionFilters;
import org.opensearch.action.support.master.AcknowledgedResponse;
Expand All @@ -22,16 +23,22 @@
import org.opensearch.common.Priority;
import org.opensearch.common.inject.Inject;
import org.opensearch.core.common.io.stream.StreamInput;
import org.opensearch.indices.IndicesService;
import org.opensearch.knn.common.exception.DeleteModelException;
import org.opensearch.knn.indices.ModelGraveyard;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.transport.TransportService;

import java.io.IOException;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;

import static java.util.stream.Collectors.toList;
import static org.opensearch.knn.common.KNNConstants.MODEL_ID;
import static org.opensearch.knn.common.KNNConstants.PLUGIN_NAME;

/**
Expand All @@ -42,14 +49,16 @@ public class UpdateModelGraveyardTransportAction extends TransportClusterManager
UpdateModelGraveyardRequest,
AcknowledgedResponse> {
private UpdateModelGraveyardExecutor updateModelGraveyardExecutor;
private final IndicesService indicesService;

@Inject
public UpdateModelGraveyardTransportAction(
TransportService transportService,
ClusterService clusterService,
ThreadPool threadPool,
ActionFilters actionFilters,
IndexNameExpressionResolver indexNameExpressionResolver
IndexNameExpressionResolver indexNameExpressionResolver,
IndicesService indicesService
) {
super(
UpdateModelGraveyardAction.NAME,
Expand All @@ -61,6 +70,7 @@ public UpdateModelGraveyardTransportAction(
indexNameExpressionResolver
);
this.updateModelGraveyardExecutor = new UpdateModelGraveyardExecutor();
this.indicesService = indicesService;
}

@Override
Expand All @@ -82,7 +92,7 @@ protected void clusterManagerOperation(
// ClusterManager updates model graveyard based on request parameters
clusterService.submitStateUpdateTask(
PLUGIN_NAME,
new UpdateModelGraveyardTask(request.getModelId(), request.isRemoveRequest()),
new UpdateModelGraveyardTask(request.getModelId(), request.isRemoveRequest(), indicesService),
ClusterStateTaskConfig.build(Priority.NORMAL),
updateModelGraveyardExecutor,
new ClusterStateTaskListener() {
Expand Down Expand Up @@ -111,6 +121,7 @@ protected ClusterBlockException checkBlock(UpdateModelGraveyardRequest request,
private static class UpdateModelGraveyardTask {
String modelId;
boolean isRemoveRequest;
IndicesService indicesService;
}

/**
Expand All @@ -123,7 +134,8 @@ private static class UpdateModelGraveyardExecutor implements ClusterStateTaskExe
* @return Represents the result of a batched execution of cluster state update tasks (UpdateModelGraveyardTasks)
*/
@Override
public ClusterTasksResult<UpdateModelGraveyardTask> execute(ClusterState clusterState, List<UpdateModelGraveyardTask> taskList) {
public ClusterTasksResult<UpdateModelGraveyardTask> execute(ClusterState clusterState, List<UpdateModelGraveyardTask> taskList)
throws IOException {

// Check if the objects are not null and throw a customized NullPointerException
Objects.requireNonNull(clusterState, "Cluster state must not be null");
Expand All @@ -146,6 +158,17 @@ public ClusterTasksResult<UpdateModelGraveyardTask> execute(ClusterState cluster
modelGraveyard.remove(task.getModelId());
continue;
}
List<String> indicesUsingModel = getIndicesUsingModel(clusterState, task);
// Throw exception if any indices are using the model
if (!indicesUsingModel.isEmpty()) {
throw new DeleteModelException(
String.format(
"Cannot delete model [%s]. Model is in use by the following indices %s, which must be deleted first.",
task.getModelId(),
indicesUsingModel
)
);
}
modelGraveyard.add(task.getModelId());
}

Expand All @@ -155,5 +178,50 @@ public ClusterTasksResult<UpdateModelGraveyardTask> execute(ClusterState cluster
ClusterState updatedClusterState = ClusterState.builder(clusterState).metadata(metaDataBuilder).build();
return new ClusterTasksResult.Builder<UpdateModelGraveyardTask>().successes(taskList).build(updatedClusterState);
}

private List<String> getIndicesUsingModel(ClusterState clusterState, UpdateModelGraveyardTask task) throws IOException {
Map<String, IndexMetadata> indices = clusterState.metadata().indices();
String[] knnIndicesList = indices.values()
.stream()
.filter(metadata -> "true".equals(metadata.getSettings().get("index.knn", "false")))
.map(metadata -> metadata.getIndex().getName())
.toArray(String[]::new);
if (knnIndicesList.length == 0) {
return Collections.emptyList();
}

return clusterState.metadata()
.findMappings(knnIndicesList, task.getIndicesService().getFieldFilter())
.entrySet()
.stream()
.filter(entry -> entry.getValue() != null)
.filter(entry -> {
Object properties = entry.getValue().getSourceAsMap().get("properties");
if (properties == null || properties instanceof Map == false) {
return false;
}
Map propertiesMap = (Map<String, Object>) properties;
return propertiesMapContainsModel(propertiesMap, task.getModelId());
})
.map(Map.Entry::getKey)
.collect(toList());
}

private boolean propertiesMapContainsModel(Map<String, Object> propertiesMap, String modelId) {
for (Map.Entry<String, Object> fieldsEntry : propertiesMap.entrySet()) {
if (fieldsEntry.getKey() != null && fieldsEntry.getValue() instanceof Map) {
Map<String, Object> innerMap = (Map<String, Object>) fieldsEntry.getValue();
for (Map.Entry<String, Object> innerEntry : innerMap.entrySet()) {
// If model is in use, fail delete model request
if (innerEntry.getKey().equals(MODEL_ID)
&& innerEntry.getValue() instanceof String
&& innerEntry.getValue().equals(modelId)) {
return true;
}
}
}
}
return false;
}
}
}
40 changes: 36 additions & 4 deletions src/test/java/org/opensearch/knn/KNNSingleNodeTestCase.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import org.opensearch.knn.index.query.KNNQueryBuilder;
import org.opensearch.knn.index.memory.NativeMemoryCacheManager;
import org.opensearch.knn.index.memory.NativeMemoryLoadStrategy;
import org.opensearch.knn.indices.Model;
import org.opensearch.knn.indices.ModelDao;
import org.opensearch.knn.indices.ModelMetadata;
import org.opensearch.knn.indices.ModelState;
Expand All @@ -36,13 +37,12 @@
import org.opensearch.test.hamcrest.OpenSearchAssertions;

import java.io.IOException;
import java.util.Collection;
import java.util.Collections;
import java.util.EnumSet;
import java.util.Map;
import java.util.*;
import java.util.concurrent.ExecutionException;

import static org.mockito.Mockito.when;
import static org.opensearch.knn.common.KNNConstants.*;
import static org.opensearch.knn.common.KNNConstants.MODEL_INDEX_NAME;

public class KNNSingleNodeTestCase extends OpenSearchSingleNodeTestCase {
@Override
Expand Down Expand Up @@ -181,6 +181,38 @@ protected void addDoc(String index, String docId, String fieldName, String dummy
assertEquals(response.status(), RestStatus.CREATED);
}

/**
* Index a new model
*/
protected void addDoc(Model model) throws IOException, ExecutionException, InterruptedException {
ModelMetadata modelMetadata = model.getModelMetadata();

XContentBuilder builder = XContentFactory.jsonBuilder()
.startObject()
.field(MODEL_ID, model.getModelID())
.field(KNN_ENGINE, modelMetadata.getKnnEngine().getName())
.field(METHOD_PARAMETER_SPACE_TYPE, modelMetadata.getSpaceType().getValue())
.field(DIMENSION, modelMetadata.getDimension())
.field(MODEL_STATE, modelMetadata.getState().getName())
.field(MODEL_TIMESTAMP, modelMetadata.getTimestamp().toString())
.field(MODEL_DESCRIPTION, modelMetadata.getDescription())
.field(MODEL_ERROR, modelMetadata.getError());

if (model.getModelBlob() != null) {
builder.field(MODEL_BLOB_PARAMETER, Base64.getEncoder().encodeToString(model.getModelBlob()));
}

builder.endObject();

IndexRequest indexRequest = new IndexRequest().index(MODEL_INDEX_NAME)
.id(model.getModelID())
.source(builder)
.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);

IndexResponse response = client().index(indexRequest).get();
assertTrue(response.status() == RestStatus.CREATED || response.status() == RestStatus.OK);
}

/**
* Run a search against a k-NN index
*/
Expand Down
1 change: 1 addition & 0 deletions src/test/java/org/opensearch/knn/index/FaissIT.java
Original file line number Diff line number Diff line change
Expand Up @@ -1334,6 +1334,7 @@ public void testSharedIndexState_whenOneIndexDeleted_thenSecondIndexIsStillSearc
// will give 15 second buffer from that
Thread.sleep(1000 * 45);
validateSearchWorkflow(secondIndexName, testData.queries, 10);
deleteKNNIndex(secondIndexName);
deleteModel(modelId);
}

Expand Down
Loading

0 comments on commit 0738439

Please sign in to comment.