Skip to content

Commit

Permalink
Added DeleteIndex Step and tests
Browse files Browse the repository at this point in the history
Signed-off-by: Owais Kazi <[email protected]>
  • Loading branch information
owaiskazi19 committed Mar 13, 2024
1 parent a182b2d commit e3efb04
Show file tree
Hide file tree
Showing 7 changed files with 244 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import org.opensearch.flowframework.workflow.CreateSearchPipelineStep;
import org.opensearch.flowframework.workflow.DeleteAgentStep;
import org.opensearch.flowframework.workflow.DeleteConnectorStep;
import org.opensearch.flowframework.workflow.DeleteIndexStep;
import org.opensearch.flowframework.workflow.DeleteModelStep;
import org.opensearch.flowframework.workflow.DeployModelStep;
import org.opensearch.flowframework.workflow.NoOpStep;
Expand Down Expand Up @@ -57,7 +58,7 @@ public enum WorkflowResources {
/** Workflow steps for creating an ingest-pipeline and associated created resource */
CREATE_SEARCH_PIPELINE(CreateSearchPipelineStep.NAME, WorkflowResources.PIPELINE_ID, null), // TODO delete step
/** Workflow steps for creating an index and associated created resource */
CREATE_INDEX(CreateIndexStep.NAME, WorkflowResources.INDEX_NAME, null), // TODO delete step
CREATE_INDEX(CreateIndexStep.NAME, WorkflowResources.INDEX_NAME, DeleteIndexStep.NAME),
/** Workflow steps for registering/deleting an agent and the associated created resource */
REGISTER_AGENT(RegisterAgentStep.NAME, WorkflowResources.AGENT_ID, DeleteAgentStep.NAME);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,9 @@

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.Collections;
import java.util.Map;
import java.util.Set;
import java.util.Collections;

import static org.opensearch.flowframework.common.CommonValue.CONFIGURATIONS;
import static org.opensearch.flowframework.common.WorkflowResources.INDEX_NAME;
Expand Down Expand Up @@ -86,7 +86,7 @@ public PlainActionFuture<WorkflowData> execute(
byte[] byteArr = configurations.getBytes(StandardCharsets.UTF_8);
BytesReference configurationsBytes = new BytesArray(byteArr);

CreateIndexRequest createIndexRequest = new CreateIndexRequest(indexName).mapping(configurationsBytes, XContentType.JSON);
CreateIndexRequest createIndexRequest = new CreateIndexRequest(indexName).source(configurationsBytes, XContentType.JSON);
client.admin().indices().create(createIndexRequest, ActionListener.wrap(acknowledgedResponse -> {
String resourceName = getResourceByWorkflowStep(getName());
logger.info("Created index: {}", indexName);
Expand Down Expand Up @@ -118,7 +118,7 @@ public PlainActionFuture<WorkflowData> execute(
createIndexFuture.onFailure(new FlowFrameworkException(errorMessage, ExceptionsHelper.status(ex)));

Check warning on line 118 in src/main/java/org/opensearch/flowframework/workflow/CreateIndexStep.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/flowframework/workflow/CreateIndexStep.java#L117-L118

Added lines #L117 - L118 were not covered by tests
}
}, e -> {
String errorMessage = "Failed to create index";
String errorMessage = "Failed to create the index " + indexName;
logger.error(errorMessage, e);
createIndexFuture.onFailure(new FlowFrameworkException(errorMessage, ExceptionsHelper.status(e)));
}));
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/
package org.opensearch.flowframework.workflow;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.ExceptionsHelper;
import org.opensearch.action.admin.indices.delete.DeleteIndexRequest;
import org.opensearch.action.support.PlainActionFuture;
import org.opensearch.action.support.master.AcknowledgedResponse;
import org.opensearch.client.Client;
import org.opensearch.core.action.ActionListener;
import org.opensearch.flowframework.exception.FlowFrameworkException;
import org.opensearch.flowframework.util.ParseUtils;

import java.util.Collections;
import java.util.Map;
import java.util.Set;

import static org.opensearch.flowframework.common.WorkflowResources.INDEX_NAME;

/**
* Step to delete an index
*/
public class DeleteIndexStep implements WorkflowStep {
private static final Logger logger = LogManager.getLogger(DeleteConnectorStep.class);

private final Client client;

/** The name of this step, used as a key in the template and the {@link WorkflowStepFactory} */
public static final String NAME = "delete_index";

/**
* Instantiate this class
*
* @param client Client to create an index
*/
public DeleteIndexStep(Client client) {
this.client = client;
}

@Override
public PlainActionFuture<WorkflowData> execute(
String currentNodeId,
WorkflowData currentNodeInputs,
Map<String, WorkflowData> outputs,
Map<String, String> previousNodeInputs,
Map<String, String> params
) {

PlainActionFuture<WorkflowData> deleteIndexFuture = PlainActionFuture.newFuture();

Set<String> requiredKeys = Set.of(INDEX_NAME);
Set<String> optionalKeys = Collections.emptySet();

try {
Map<String, Object> inputs = ParseUtils.getInputsFromPreviousSteps(
requiredKeys,
optionalKeys,
currentNodeInputs,
outputs,
previousNodeInputs,
params
);
String indexName = (String) inputs.get(INDEX_NAME);

DeleteIndexRequest deleteIndexRequest = new DeleteIndexRequest(indexName);
client.admin().indices().delete(deleteIndexRequest, new ActionListener<AcknowledgedResponse>() {
@Override
public void onResponse(AcknowledgedResponse response) {
logger.info("Deleted index: {}", indexName);
deleteIndexFuture.onResponse(
new WorkflowData(
Map.ofEntries(Map.entry(INDEX_NAME, indexName)),
currentNodeInputs.getWorkflowId(),
currentNodeInputs.getNodeId()
)
);
}

@Override
public void onFailure(Exception e) {
String errorMessage = "Failed to delete the index " + indexName;
logger.error(errorMessage, e);
deleteIndexFuture.onFailure(new FlowFrameworkException(errorMessage, ExceptionsHelper.status(e)));
}
});
} catch (FlowFrameworkException e) {
deleteIndexFuture.onFailure(e);

Check warning on line 95 in src/main/java/org/opensearch/flowframework/workflow/DeleteIndexStep.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/flowframework/workflow/DeleteIndexStep.java#L94-L95

Added lines #L94 - L95 were not covered by tests
}

return deleteIndexFuture;
}

@Override
public String getName() {
return NAME;

Check warning on line 103 in src/main/java/org/opensearch/flowframework/workflow/DeleteIndexStep.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/flowframework/workflow/DeleteIndexStep.java#L103

Added line #L103 was not covered by tests
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,11 @@
import static org.opensearch.flowframework.common.CommonValue.TYPE;
import static org.opensearch.flowframework.common.CommonValue.URL;
import static org.opensearch.flowframework.common.CommonValue.VERSION_FIELD;
import static org.opensearch.flowframework.common.WorkflowResources.*;
import static org.opensearch.flowframework.common.WorkflowResources.AGENT_ID;
import static org.opensearch.flowframework.common.WorkflowResources.CONNECTOR_ID;
import static org.opensearch.flowframework.common.WorkflowResources.INDEX_NAME;
import static org.opensearch.flowframework.common.WorkflowResources.MODEL_GROUP_ID;
import static org.opensearch.flowframework.common.WorkflowResources.MODEL_ID;

/**
* Generates instances implementing {@link WorkflowStep}.
Expand Down Expand Up @@ -80,6 +84,7 @@ public WorkflowStepFactory(
) {
stepMap.put(NoOpStep.NAME, NoOpStep::new);
stepMap.put(CreateIndexStep.NAME, () -> new CreateIndexStep(client, flowFrameworkIndicesHandler));
stepMap.put(DeleteIndexStep.NAME, () -> new DeleteIndexStep(client));
stepMap.put(
RegisterLocalCustomModelStep.NAME,
() -> new RegisterLocalCustomModelStep(threadPool, mlClient, flowFrameworkIndicesHandler, flowFrameworkSettings)
Expand Down Expand Up @@ -185,6 +190,9 @@ public enum WorkflowSteps {
null
),

/** Delete Index Step */
DELETE_INDEX(DeleteIndexStep.NAME, List.of(INDEX_NAME), List.of(INDEX_NAME), Collections.emptyList(), null),

/** Deploy Model Step */
DEPLOY_MODEL(DeployModelStep.NAME, List.of(MODEL_ID), List.of(MODEL_ID), List.of(OPENSEARCH_ML), TimeValue.timeValueSeconds(15)),

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ public void testParseWorkflowValidator() throws IOException {

WorkflowValidator validator = new WorkflowValidator(workflowStepValidators);

assertEquals(16, validator.getWorkflowStepValidators().size());
assertEquals(18, validator.getWorkflowStepValidators().size());

assertTrue(validator.getWorkflowStepValidators().keySet().contains("create_connector"));
assertEquals(7, validator.getWorkflowStepValidators().get("create_connector").getInputs().size());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,8 @@
import org.opensearch.client.AdminClient;
import org.opensearch.client.Client;
import org.opensearch.client.IndicesAdminClient;
import org.opensearch.cluster.ClusterName;
import org.opensearch.cluster.ClusterState;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.cluster.metadata.Metadata;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.util.concurrent.ThreadContext;
import org.opensearch.core.action.ActionListener;
Expand All @@ -38,6 +35,7 @@
import org.mockito.MockitoAnnotations;

import static org.opensearch.action.DocWriteResponse.Result.UPDATED;
import static org.opensearch.flowframework.common.CommonValue.CONFIGURATIONS;
import static org.opensearch.flowframework.common.CommonValue.GLOBAL_CONTEXT_INDEX;
import static org.opensearch.flowframework.common.CommonValue.WORKFLOW_STATE_INDEX;
import static org.opensearch.flowframework.common.WorkflowResources.INDEX_NAME;
Expand All @@ -58,8 +56,6 @@ public class CreateIndexStepTests extends OpenSearchTestCase {
private ThreadContext threadContext;
private Metadata metadata;

@Mock
private ClusterService clusterService;
@Mock
private IndicesAdminClient indicesAdminClient;
@Mock
Expand All @@ -73,12 +69,14 @@ public void setUp() throws Exception {
super.setUp();
this.flowFrameworkIndicesHandler = mock(FlowFrameworkIndicesHandler.class);
MockitoAnnotations.openMocks(this);
String configurations =
"{\"settings\":{\"index\":{\"number_of_shards\":2,\"number_of_replicas\":1}},\"mappings\":{\"_doc\":{\"properties\":{\"age\":{\"type\":\"integer\"}}}},\"aliases\":{\"sample-alias1\":{}}}";

inputData = new WorkflowData(
Map.ofEntries(Map.entry(INDEX_NAME, "demo"), Map.entry("default_mapping_option", "knn")),
Map.ofEntries(Map.entry(INDEX_NAME, "demo"), Map.entry(CONFIGURATIONS, configurations)),
"test-id",
"test-node-id"
);
clusterService = mock(ClusterService.class);
client = mock(Client.class);
adminClient = mock(AdminClient.class);
metadata = mock(Metadata.class);
Expand All @@ -89,7 +87,6 @@ public void setUp() throws Exception {
when(threadPool.getThreadContext()).thenReturn(threadContext);
when(client.admin()).thenReturn(adminClient);
when(adminClient.indices()).thenReturn(indicesAdminClient);
when(clusterService.state()).thenReturn(ClusterState.builder(new ClusterName("test cluster")).build());
when(metadata.indices()).thenReturn(Map.of(GLOBAL_CONTEXT_INDEX, indexMetadata));

createIndexStep = new CreateIndexStep(client, flowFrameworkIndicesHandler);
Expand Down Expand Up @@ -141,6 +138,6 @@ public void testCreateIndexStepFailure() throws ExecutionException, InterruptedE
assertTrue(future.isDone());
ExecutionException ex = assertThrows(ExecutionException.class, () -> future.get().getContent());
assertTrue(ex.getCause() instanceof Exception);
assertEquals("Failed to create an index", ex.getCause().getMessage());
assertEquals("Failed to create the index demo", ex.getCause().getMessage());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/
package org.opensearch.flowframework.workflow;

import org.opensearch.action.admin.indices.delete.DeleteIndexRequest;
import org.opensearch.action.support.PlainActionFuture;
import org.opensearch.action.support.master.AcknowledgedResponse;
import org.opensearch.client.AdminClient;
import org.opensearch.client.Client;
import org.opensearch.client.IndicesAdminClient;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.cluster.metadata.Metadata;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.util.concurrent.ThreadContext;
import org.opensearch.core.action.ActionListener;
import org.opensearch.test.OpenSearchTestCase;
import org.opensearch.threadpool.ThreadPool;

import java.io.IOException;
import java.util.Collections;
import java.util.Map;
import java.util.concurrent.ExecutionException;

import org.mockito.ArgumentCaptor;
import org.mockito.Mock;
import org.mockito.MockitoAnnotations;

import static org.opensearch.flowframework.common.CommonValue.GLOBAL_CONTEXT_INDEX;
import static org.opensearch.flowframework.common.WorkflowResources.INDEX_NAME;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

public class DeleteIndexStepTests extends OpenSearchTestCase {

private WorkflowData inputData = WorkflowData.EMPTY;
private Client client;
private AdminClient adminClient;
private DeleteIndexStep deleteIndexStep;
private ThreadContext threadContext;
private Metadata metadata;

@Mock
private IndicesAdminClient indicesAdminClient;
@Mock
private ThreadPool threadPool;
@Mock
IndexMetadata indexMetadata;

@Override
public void setUp() throws Exception {
super.setUp();
MockitoAnnotations.openMocks(this);
inputData = new WorkflowData(Map.ofEntries(Map.entry(INDEX_NAME, "demo")), "test-id", "test-node-id");
client = mock(Client.class);
adminClient = mock(AdminClient.class);
metadata = mock(Metadata.class);
Settings settings = Settings.builder().build();
threadContext = new ThreadContext(settings);

when(client.threadPool()).thenReturn(threadPool);
when(threadPool.getThreadContext()).thenReturn(threadContext);
when(client.admin()).thenReturn(adminClient);
when(adminClient.indices()).thenReturn(indicesAdminClient);
when(metadata.indices()).thenReturn(Map.of(GLOBAL_CONTEXT_INDEX, indexMetadata));

deleteIndexStep = new DeleteIndexStep(client);
}

public void testDeleteIndex() throws IOException, ExecutionException, InterruptedException {

@SuppressWarnings({ "unchecked" })
ArgumentCaptor<ActionListener<AcknowledgedResponse>> actionListenerCaptor = ArgumentCaptor.forClass(ActionListener.class);
PlainActionFuture<WorkflowData> future = deleteIndexStep.execute(
inputData.getNodeId(),
inputData,
Collections.emptyMap(),
Collections.emptyMap(),
Collections.emptyMap()
);
assertFalse(future.isDone());
verify(indicesAdminClient, times(1)).delete(any(DeleteIndexRequest.class), actionListenerCaptor.capture());
actionListenerCaptor.getValue().onResponse(new AcknowledgedResponse(true));

assertTrue(future.isDone());

Map<String, Object> outputData = Map.of(INDEX_NAME, "demo");
assertEquals(outputData, future.get().getContent());
}

public void testDeleteIndexStepFailure() throws ExecutionException, InterruptedException {
@SuppressWarnings({ "unchecked" })
ArgumentCaptor<ActionListener<AcknowledgedResponse>> actionListenerCaptor = ArgumentCaptor.forClass(ActionListener.class);
PlainActionFuture<WorkflowData> future = deleteIndexStep.execute(
inputData.getNodeId(),
inputData,
Collections.emptyMap(),
Collections.emptyMap(),
Collections.emptyMap()
);
assertFalse(future.isDone());
verify(indicesAdminClient, times(1)).delete(any(DeleteIndexRequest.class), actionListenerCaptor.capture());
actionListenerCaptor.getValue().onFailure(new Exception("Failed to delete the index"));

assertTrue(future.isDone());
ExecutionException ex = assertThrows(ExecutionException.class, () -> future.get().getContent());
assertTrue(ex.getCause() instanceof Exception);
assertEquals("Failed to delete the index demo", ex.getCause().getMessage());
}
}

0 comments on commit e3efb04

Please sign in to comment.