Skip to content

Commit

Permalink
Added Create index workflow step (#574)
Browse files Browse the repository at this point in the history
* Implemented Create Index Step

Signed-off-by: Owais Kazi <[email protected]>

* Added DeleteIndex Step and tests

Signed-off-by: Owais Kazi <[email protected]>

* Added changelog entry

Signed-off-by: Owais Kazi <[email protected]>

* Removed DeleteIndexStep and tests

Signed-off-by: Owais Kazi <[email protected]>

---------

Signed-off-by: Owais Kazi <[email protected]>
  • Loading branch information
owaiskazi19 committed Mar 14, 2024
1 parent 10915aa commit 1f6573d
Show file tree
Hide file tree
Showing 6 changed files with 63 additions and 90 deletions.
5 changes: 3 additions & 2 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,9 @@ Inspired from [Keep a Changelog](https://keepachangelog.com/en/1.1.0/)

## [Unreleased 2.x](https://github.com/opensearch-project/flow-framework/compare/2.12...2.x)
### Features
- Adding create ingest pipeline step ([#558](https://github.com/opensearch-project/flow-framework/pull/558))
- Adding create search pipeline step ([#569](https://github.com/opensearch-project/flow-framework/pull/569))
- Added create ingest pipeline step ([#558](https://github.com/opensearch-project/flow-framework/pull/558))
- Added create search pipeline step ([#569](https://github.com/opensearch-project/flow-framework/pull/569))
- Added create index step ([#574](https://github.com/opensearch-project/flow-framework/pull/574))

### Enhancements
- Substitute REST path or body parameters in Workflow Steps ([#525](https://github.com/opensearch-project/flow-framework/pull/525))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ public enum WorkflowResources {
/** Workflow steps for creating an ingest-pipeline and associated created resource */
CREATE_SEARCH_PIPELINE(CreateSearchPipelineStep.NAME, WorkflowResources.PIPELINE_ID, null), // TODO delete step
/** Workflow steps for creating an index and associated created resource */
CREATE_INDEX(CreateIndexStep.NAME, WorkflowResources.INDEX_NAME, null), // TODO delete step
CREATE_INDEX(CreateIndexStep.NAME, WorkflowResources.INDEX_NAME, NoOpStep.NAME),
/** Workflow steps for registering/deleting an agent and the associated created resource */
REGISTER_AGENT(RegisterAgentStep.NAME, WorkflowResources.AGENT_ID, DeleteAgentStep.NAME);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,23 +12,24 @@
import org.apache.logging.log4j.Logger;
import org.opensearch.ExceptionsHelper;
import org.opensearch.action.admin.indices.create.CreateIndexRequest;
import org.opensearch.action.admin.indices.create.CreateIndexResponse;
import org.opensearch.action.support.PlainActionFuture;
import org.opensearch.client.Client;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.xcontent.json.JsonXContent;
import org.opensearch.common.xcontent.XContentType;
import org.opensearch.core.action.ActionListener;
import org.opensearch.core.common.bytes.BytesArray;
import org.opensearch.core.common.bytes.BytesReference;
import org.opensearch.flowframework.exception.FlowFrameworkException;
import org.opensearch.flowframework.indices.FlowFrameworkIndicesHandler;
import org.opensearch.flowframework.util.ParseUtils;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.Collections;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.Set;

import static org.opensearch.flowframework.common.CommonValue.DEFAULT_MAPPING_OPTION;
import static org.opensearch.flowframework.common.CommonValue.CONFIGURATIONS;
import static org.opensearch.flowframework.common.WorkflowResources.INDEX_NAME;
import static org.opensearch.flowframework.common.WorkflowResources.getResourceByWorkflowStep;

/**
Expand All @@ -37,23 +38,19 @@
public class CreateIndexStep implements WorkflowStep {

private static final Logger logger = LogManager.getLogger(CreateIndexStep.class);
private final ClusterService clusterService;
private final Client client;
private final FlowFrameworkIndicesHandler flowFrameworkIndicesHandler;

/** The name of this step, used as a key in the template and the {@link WorkflowStepFactory} */
public static final String NAME = "create_index";
static Map<String, AtomicBoolean> indexMappingUpdated = new HashMap<>();

/**
* Instantiate this class
*
* @param clusterService The OpenSearch cluster service
* @param client Client to create an index
* @param flowFrameworkIndicesHandler FlowFrameworkIndicesHandler class to update system indices
*/
public CreateIndexStep(ClusterService clusterService, Client client, FlowFrameworkIndicesHandler flowFrameworkIndicesHandler) {
this.clusterService = clusterService;
public CreateIndexStep(Client client, FlowFrameworkIndicesHandler flowFrameworkIndicesHandler) {
this.client = client;
this.flowFrameworkIndicesHandler = flowFrameworkIndicesHandler;
}
Expand All @@ -67,89 +64,66 @@ public PlainActionFuture<WorkflowData> execute(
Map<String, String> params
) {
PlainActionFuture<WorkflowData> createIndexFuture = PlainActionFuture.newFuture();
ActionListener<CreateIndexResponse> actionListener = new ActionListener<>() {

@Override
public void onResponse(CreateIndexResponse createIndexResponse) {
Set<String> requiredKeys = Set.of(INDEX_NAME, CONFIGURATIONS);

Set<String> optionalKeys = Collections.emptySet();

try {
Map<String, Object> inputs = ParseUtils.getInputsFromPreviousSteps(
requiredKeys,
optionalKeys,
currentNodeInputs,
outputs,
previousNodeInputs,
params
);

String indexName = (String) inputs.get(INDEX_NAME);

String configurations = (String) inputs.get(CONFIGURATIONS);

byte[] byteArr = configurations.getBytes(StandardCharsets.UTF_8);
BytesReference configurationsBytes = new BytesArray(byteArr);

CreateIndexRequest createIndexRequest = new CreateIndexRequest(indexName).source(configurationsBytes, XContentType.JSON);
client.admin().indices().create(createIndexRequest, ActionListener.wrap(acknowledgedResponse -> {
String resourceName = getResourceByWorkflowStep(getName());
logger.info("Created index: {}", indexName);
try {
String resourceName = getResourceByWorkflowStep(getName());
logger.info("created index: {}", createIndexResponse.index());
flowFrameworkIndicesHandler.updateResourceInStateIndex(
currentNodeInputs.getWorkflowId(),
currentNodeId,
getName(),
createIndexResponse.index(),
indexName,
ActionListener.wrap(response -> {
logger.info("successfully updated resource created in state index: {}", response.getIndex());
createIndexFuture.onResponse(
new WorkflowData(
Map.of(resourceName, createIndexResponse.index()),
currentNodeInputs.getWorkflowId(),
currentNodeId
)
new WorkflowData(Map.of(resourceName, indexName), currentNodeInputs.getWorkflowId(), currentNodeId)
);
}, exception -> {
String errorMessage = "Failed to update new created "
+ currentNodeId
+ " resource "
+ getName()
+ " id "
+ createIndexResponse.index();
+ indexName;
logger.error(errorMessage, exception);
createIndexFuture.onFailure(new FlowFrameworkException(errorMessage, ExceptionsHelper.status(exception)));
})
);
} catch (Exception e) {
} catch (IOException ex) {
String errorMessage = "Failed to parse and update new created resource";
logger.error(errorMessage, e);
createIndexFuture.onFailure(new FlowFrameworkException(errorMessage, ExceptionsHelper.status(e)));
logger.error(errorMessage, ex);
createIndexFuture.onFailure(new FlowFrameworkException(errorMessage, ExceptionsHelper.status(ex)));
}
}

@Override
public void onFailure(Exception e) {
String errorMessage = "Failed to create an index";
}, e -> {
String errorMessage = "Failed to create the index " + indexName;
logger.error(errorMessage, e);
createIndexFuture.onFailure(new FlowFrameworkException(errorMessage, ExceptionsHelper.status(e)));
}
};

String index = null;
String defaultMappingOption = null;
Settings settings = null;

// TODO: Recreating the list to get this compiling
// Need to refactor the below iteration to pull directly from the maps
List<WorkflowData> data = new ArrayList<>();
data.add(currentNodeInputs);
data.addAll(outputs.values());

try {
for (WorkflowData workflowData : data) {
Map<String, Object> content = workflowData.getContent();
index = (String) content.get(getResourceByWorkflowStep(getName()));
defaultMappingOption = (String) content.get(DEFAULT_MAPPING_OPTION);
if (index != null && defaultMappingOption != null && settings != null) {
break;
}
}
} catch (Exception e) {
String errorMessage = "Failed to find the correct resource for the workflow step " + NAME;
logger.error(errorMessage, e);
createIndexFuture.onFailure(new FlowFrameworkException(errorMessage, ExceptionsHelper.status(e)));
}

// TODO:
// 1. Create settings based on the index settings received from content

try {
CreateIndexRequest request = new CreateIndexRequest(index).mapping(
FlowFrameworkIndicesHandler.getIndexMappings("mappings/" + defaultMappingOption + ".json"),
JsonXContent.jsonXContent.mediaType()
);
client.admin().indices().create(request, actionListener);
}));
} catch (Exception e) {
logger.error("Failed to find the right mapping for the index", e);
createIndexFuture.onFailure(e);
}

return createIndexFuture;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@
import static org.opensearch.flowframework.common.CommonValue.VERSION_FIELD;
import static org.opensearch.flowframework.common.WorkflowResources.AGENT_ID;
import static org.opensearch.flowframework.common.WorkflowResources.CONNECTOR_ID;
import static org.opensearch.flowframework.common.WorkflowResources.INDEX_NAME;
import static org.opensearch.flowframework.common.WorkflowResources.MODEL_GROUP_ID;
import static org.opensearch.flowframework.common.WorkflowResources.MODEL_ID;

Expand Down Expand Up @@ -82,6 +83,7 @@ public WorkflowStepFactory(
Client client
) {
stepMap.put(NoOpStep.NAME, NoOpStep::new);
stepMap.put(CreateIndexStep.NAME, () -> new CreateIndexStep(client, flowFrameworkIndicesHandler));
stepMap.put(
RegisterLocalCustomModelStep.NAME,
() -> new RegisterLocalCustomModelStep(threadPool, mlClient, flowFrameworkIndicesHandler, flowFrameworkSettings)
Expand Down Expand Up @@ -120,6 +122,9 @@ public enum WorkflowSteps {
/** Noop Step */
NOOP("noop", Collections.emptyList(), Collections.emptyList(), Collections.emptyList(), null),

/** Create Index Step */
CREATE_INDEX(CreateIndexStep.NAME, List.of(INDEX_NAME, CONFIGURATIONS), List.of(INDEX_NAME), Collections.emptyList(), null),

/** Create Connector Step */
CREATE_CONNECTOR(
CreateConnectorStep.NAME,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ public void testParseWorkflowValidator() throws IOException {

WorkflowValidator validator = new WorkflowValidator(workflowStepValidators);

assertEquals(16, validator.getWorkflowStepValidators().size());
assertEquals(17, validator.getWorkflowStepValidators().size());

assertTrue(validator.getWorkflowStepValidators().keySet().contains("create_connector"));
assertEquals(7, validator.getWorkflowStepValidators().get("create_connector").getInputs().size());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,8 @@
import org.opensearch.client.AdminClient;
import org.opensearch.client.Client;
import org.opensearch.client.IndicesAdminClient;
import org.opensearch.cluster.ClusterName;
import org.opensearch.cluster.ClusterState;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.cluster.metadata.Metadata;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.util.concurrent.ThreadContext;
import org.opensearch.core.action.ActionListener;
Expand All @@ -30,16 +27,15 @@

import java.io.IOException;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicBoolean;

import org.mockito.ArgumentCaptor;
import org.mockito.Mock;
import org.mockito.MockitoAnnotations;

import static org.opensearch.action.DocWriteResponse.Result.UPDATED;
import static org.opensearch.flowframework.common.CommonValue.CONFIGURATIONS;
import static org.opensearch.flowframework.common.CommonValue.GLOBAL_CONTEXT_INDEX;
import static org.opensearch.flowframework.common.CommonValue.WORKFLOW_STATE_INDEX;
import static org.opensearch.flowframework.common.WorkflowResources.INDEX_NAME;
Expand All @@ -59,10 +55,7 @@ public class CreateIndexStepTests extends OpenSearchTestCase {
private CreateIndexStep createIndexStep;
private ThreadContext threadContext;
private Metadata metadata;
private Map<String, AtomicBoolean> indexMappingUpdated = new HashMap<>();

@Mock
private ClusterService clusterService;
@Mock
private IndicesAdminClient indicesAdminClient;
@Mock
Expand All @@ -76,12 +69,14 @@ public void setUp() throws Exception {
super.setUp();
this.flowFrameworkIndicesHandler = mock(FlowFrameworkIndicesHandler.class);
MockitoAnnotations.openMocks(this);
String configurations =
"{\"settings\":{\"index\":{\"number_of_shards\":2,\"number_of_replicas\":1}},\"mappings\":{\"_doc\":{\"properties\":{\"age\":{\"type\":\"integer\"}}}},\"aliases\":{\"sample-alias1\":{}}}";

inputData = new WorkflowData(
Map.ofEntries(Map.entry(INDEX_NAME, "demo"), Map.entry("default_mapping_option", "knn")),
Map.ofEntries(Map.entry(INDEX_NAME, "demo"), Map.entry(CONFIGURATIONS, configurations)),
"test-id",
"test-node-id"
);
clusterService = mock(ClusterService.class);
client = mock(Client.class);
adminClient = mock(AdminClient.class);
metadata = mock(Metadata.class);
Expand All @@ -92,11 +87,9 @@ public void setUp() throws Exception {
when(threadPool.getThreadContext()).thenReturn(threadContext);
when(client.admin()).thenReturn(adminClient);
when(adminClient.indices()).thenReturn(indicesAdminClient);
when(clusterService.state()).thenReturn(ClusterState.builder(new ClusterName("test cluster")).build());
when(metadata.indices()).thenReturn(Map.of(GLOBAL_CONTEXT_INDEX, indexMetadata));

createIndexStep = new CreateIndexStep(clusterService, client, flowFrameworkIndicesHandler);
CreateIndexStep.indexMappingUpdated = indexMappingUpdated;
createIndexStep = new CreateIndexStep(client, flowFrameworkIndicesHandler);
}

public void testCreateIndexStep() throws ExecutionException, InterruptedException, IOException {
Expand Down Expand Up @@ -145,6 +138,6 @@ public void testCreateIndexStepFailure() throws ExecutionException, InterruptedE
assertTrue(future.isDone());
ExecutionException ex = assertThrows(ExecutionException.class, () -> future.get().getContent());
assertTrue(ex.getCause() instanceof Exception);
assertEquals("Failed to create an index", ex.getCause().getMessage());
assertEquals("Failed to create the index demo", ex.getCause().getMessage());
}
}

0 comments on commit 1f6573d

Please sign in to comment.