Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added workflow step for ReIndex Step #718

Merged
merged 5 commits into from
May 30, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ Inspired from [Keep a Changelog](https://keepachangelog.com/en/1.1.0/)
### Enhancements
### Bug Fixes
- Add user mapping to Workflow State index ([#705](https://github.com/opensearch-project/flow-framework/pull/705))
- Add Workflow Step for Reindex from source index to destination ([#718](https://github.com/opensearch-project/flow-framework/pull/718))
owaiskazi19 marked this conversation as resolved.
Show resolved Hide resolved

### Infrastructure
### Documentation
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,22 @@ private CommonValue() {}
public static final String DELAY_FIELD = "delay";
/** Model Interface Field */
public static final String INTERFACE_FIELD = "interface";

/** The reindex field for created resources */
public static final String RE_INDEX_FIELD = "reindex";
owaiskazi19 marked this conversation as resolved.
Show resolved Hide resolved
/** The source index field for reindex */
public static final String SOURCE_INDEX = "source_index";
/** The destination index field for reindex */
public static final String DESTINATION_INDEX = "destination_index";
/** The refresh field for reindex */
owaiskazi19 marked this conversation as resolved.
Show resolved Hide resolved
public static final String REFRESH = "refresh";
/** The requests_per_second field for reindex */
public static final String REQUESTS_PER_SECOND = "requests_per_second";
/** The require_alias field for reindex */
public static final String REQUIRE_ALIAS = "require_alias";
/** The slices field for reindex */
public static final String SLICES = "slices";
owaiskazi19 marked this conversation as resolved.
Show resolved Hide resolved
/** The max_docs field for reindex */
public static final String MAX_DOCS = "max_docs";
/*
* Constants associated with resource provisioning / state
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.opensearch.flowframework.workflow.DeleteModelStep;
import org.opensearch.flowframework.workflow.DeployModelStep;
import org.opensearch.flowframework.workflow.NoOpStep;
import org.opensearch.flowframework.workflow.ReIndexStep;
import org.opensearch.flowframework.workflow.RegisterAgentStep;
import org.opensearch.flowframework.workflow.RegisterLocalCustomModelStep;
import org.opensearch.flowframework.workflow.RegisterLocalPretrainedModelStep;
Expand Down Expand Up @@ -58,6 +59,8 @@ public enum WorkflowResources {
CREATE_SEARCH_PIPELINE(CreateSearchPipelineStep.NAME, WorkflowResources.PIPELINE_ID, null), // TODO delete step
/** Workflow steps for creating an index and associated created resource */
CREATE_INDEX(CreateIndexStep.NAME, WorkflowResources.INDEX_NAME, NoOpStep.NAME),
/** Workflow steps for reindex a source index to destination index and associated created resource */
RE_INDEX(ReIndexStep.NAME, CommonValue.DESTINATION_INDEX, NoOpStep.NAME),
owaiskazi19 marked this conversation as resolved.
Show resolved Hide resolved
/** Workflow steps for registering/deleting an agent and the associated created resource */
REGISTER_AGENT(RegisterAgentStep.NAME, WorkflowResources.AGENT_ID, DeleteAgentStep.NAME);

Expand Down
175 changes: 175 additions & 0 deletions src/main/java/org/opensearch/flowframework/workflow/ReIndexStep.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,175 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/
package org.opensearch.flowframework.workflow;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.ExceptionsHelper;
import org.opensearch.action.support.PlainActionFuture;
import org.opensearch.client.Client;
import org.opensearch.common.Booleans;
import org.opensearch.core.action.ActionListener;
import org.opensearch.flowframework.exception.FlowFrameworkException;
import org.opensearch.flowframework.exception.WorkflowStepException;
import org.opensearch.flowframework.indices.FlowFrameworkIndicesHandler;
import org.opensearch.flowframework.util.ParseUtils;
import org.opensearch.index.reindex.BulkByScrollResponse;
import org.opensearch.index.reindex.ReindexAction;
import org.opensearch.index.reindex.ReindexRequest;

import java.util.Map;
import java.util.Set;

import static org.opensearch.flowframework.common.CommonValue.DESTINATION_INDEX;
import static org.opensearch.flowframework.common.CommonValue.MAX_DOCS;
import static org.opensearch.flowframework.common.CommonValue.REFRESH;
import static org.opensearch.flowframework.common.CommonValue.REQUESTS_PER_SECOND;
import static org.opensearch.flowframework.common.CommonValue.REQUIRE_ALIAS;
import static org.opensearch.flowframework.common.CommonValue.RE_INDEX_FIELD;
import static org.opensearch.flowframework.common.CommonValue.SLICES;
import static org.opensearch.flowframework.common.CommonValue.SOURCE_INDEX;

/**
* Step to reindex
*/
public class ReIndexStep implements WorkflowStep {
owaiskazi19 marked this conversation as resolved.
Show resolved Hide resolved

private static final Logger logger = LogManager.getLogger(ReIndexStep.class);
private final Client client;
private final FlowFrameworkIndicesHandler flowFrameworkIndicesHandler;

/** The name of this step, used as a key in the template and the {@link WorkflowStepFactory} */
public static final String NAME = "reindex";

/**
* Instantiate this class
*
* @param client Client to create an index
* @param flowFrameworkIndicesHandler FlowFrameworkIndicesHandler class to update system indices
*/
public ReIndexStep(Client client, FlowFrameworkIndicesHandler flowFrameworkIndicesHandler) {
this.client = client;
this.flowFrameworkIndicesHandler = flowFrameworkIndicesHandler;
}

@Override
public PlainActionFuture<WorkflowData> execute(
String currentNodeId,
WorkflowData currentNodeInputs,
Map<String, WorkflowData> outputs,
Map<String, String> previousNodeInputs,
Map<String, String> params
) {

PlainActionFuture<WorkflowData> reIndexFuture = PlainActionFuture.newFuture();

Set<String> requiredKeys = Set.of(SOURCE_INDEX, DESTINATION_INDEX);
owaiskazi19 marked this conversation as resolved.
Show resolved Hide resolved

Set<String> optionalKeys = Set.of(REFRESH, REQUESTS_PER_SECOND, REQUIRE_ALIAS, SLICES, MAX_DOCS);

try {
Map<String, Object> inputs = ParseUtils.getInputsFromPreviousSteps(
requiredKeys,
optionalKeys,
currentNodeInputs,
outputs,
previousNodeInputs,
params
);

String sourceIndices = (String) inputs.get(SOURCE_INDEX);
owaiskazi19 marked this conversation as resolved.
Show resolved Hide resolved
String destinationIndex = (String) inputs.get(DESTINATION_INDEX);
Boolean refresh = inputs.containsKey(REFRESH) ? Booleans.parseBoolean(inputs.get(REFRESH).toString()) : null;
owaiskazi19 marked this conversation as resolved.
Show resolved Hide resolved
Integer requestsPerSecond = (Integer) inputs.get(REQUESTS_PER_SECOND);
owaiskazi19 marked this conversation as resolved.
Show resolved Hide resolved
Boolean requireAlias = inputs.containsKey(REQUIRE_ALIAS) ? Booleans.parseBoolean(inputs.get(REQUIRE_ALIAS).toString()) : null;
Integer slices = (Integer) inputs.get(SLICES);
Integer maxDocs = (Integer) inputs.get(MAX_DOCS);

ReindexRequest reindexRequest = new ReindexRequest();
reindexRequest.setSourceIndices(sourceIndices);
reindexRequest.setDestIndex(destinationIndex);
owaiskazi19 marked this conversation as resolved.
Show resolved Hide resolved
if (refresh != null) {
reindexRequest.setRefresh(refresh);
}
if (requestsPerSecond != null) {
reindexRequest.setRequestsPerSecond(requestsPerSecond);
}
if (requireAlias != null) {
reindexRequest.setRequireAlias(requireAlias);
}
if (maxDocs != null) {
reindexRequest.setMaxDocs(maxDocs);
}
if (slices != null) {
reindexRequest.setSlices(slices);
}

ActionListener<BulkByScrollResponse> actionListener = new ActionListener<>() {

@Override
public void onResponse(BulkByScrollResponse bulkByScrollResponse) {
logger.info("Reindex from source: {} to destination {}", sourceIndices, destinationIndex);
try {
if (bulkByScrollResponse.getBulkFailures().isEmpty() && bulkByScrollResponse.getSearchFailures().isEmpty()) {
dbwiddis marked this conversation as resolved.
Show resolved Hide resolved
flowFrameworkIndicesHandler.updateResourceInStateIndex(
owaiskazi19 marked this conversation as resolved.
Show resolved Hide resolved
currentNodeInputs.getWorkflowId(),
currentNodeId,
getName(),
destinationIndex,
ActionListener.wrap(response -> {
logger.info("successfully updated resource created in state index: {}", response.getIndex());

reIndexFuture.onResponse(
new WorkflowData(
Map.of(RE_INDEX_FIELD, Map.of(sourceIndices, destinationIndex)),
joshpalis marked this conversation as resolved.
Show resolved Hide resolved
currentNodeInputs.getWorkflowId(),
currentNodeInputs.getNodeId()
)
);
}, exception -> {
String errorMessage = "Failed to update new reindexed"

Check warning on line 136 in src/main/java/org/opensearch/flowframework/workflow/ReIndexStep.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/flowframework/workflow/ReIndexStep.java#L136

Added line #L136 was not covered by tests
owaiskazi19 marked this conversation as resolved.
Show resolved Hide resolved
+ currentNodeId
+ " resource "
+ getName()

Check warning on line 139 in src/main/java/org/opensearch/flowframework/workflow/ReIndexStep.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/flowframework/workflow/ReIndexStep.java#L139

Added line #L139 was not covered by tests
+ " id "
+ destinationIndex;
logger.error(errorMessage, exception);
reIndexFuture.onFailure(new FlowFrameworkException(errorMessage, ExceptionsHelper.status(exception)));
})

Check warning on line 144 in src/main/java/org/opensearch/flowframework/workflow/ReIndexStep.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/flowframework/workflow/ReIndexStep.java#L142-L144

Added lines #L142 - L144 were not covered by tests
);
}
} catch (Exception e) {
String errorMessage = "Failed to parse and update new created resource";
logger.error(errorMessage, e);
reIndexFuture.onFailure(new FlowFrameworkException(errorMessage, ExceptionsHelper.status(e)));

Check warning on line 150 in src/main/java/org/opensearch/flowframework/workflow/ReIndexStep.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/flowframework/workflow/ReIndexStep.java#L147-L150

Added lines #L147 - L150 were not covered by tests
}
}

@Override
public void onFailure(Exception e) {
String errorMessage = "Failed to reindex from source " + sourceIndices + " to " + destinationIndex;
logger.error(errorMessage, e);
reIndexFuture.onFailure(new WorkflowStepException(errorMessage, ExceptionsHelper.status(e)));
}
};

client.execute(ReindexAction.INSTANCE, reindexRequest, actionListener);

} catch (Exception e) {
reIndexFuture.onFailure(e);

Check warning on line 165 in src/main/java/org/opensearch/flowframework/workflow/ReIndexStep.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/flowframework/workflow/ReIndexStep.java#L164-L165

Added lines #L164 - L165 were not covered by tests
}

return reIndexFuture;
}

@Override
public String getName() {
return NAME;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import static org.opensearch.flowframework.common.CommonValue.CONFIGURATIONS;
import static org.opensearch.flowframework.common.CommonValue.CREDENTIAL_FIELD;
import static org.opensearch.flowframework.common.CommonValue.DESCRIPTION_FIELD;
import static org.opensearch.flowframework.common.CommonValue.DESTINATION_INDEX;
import static org.opensearch.flowframework.common.CommonValue.EMBEDDING_DIMENSION;
import static org.opensearch.flowframework.common.CommonValue.FRAMEWORK_TYPE;
import static org.opensearch.flowframework.common.CommonValue.FUNCTION_NAME;
Expand All @@ -47,6 +48,8 @@
import static org.opensearch.flowframework.common.CommonValue.PIPELINE_ID;
import static org.opensearch.flowframework.common.CommonValue.PROTOCOL_FIELD;
import static org.opensearch.flowframework.common.CommonValue.REGISTER_MODEL_STATUS;
import static org.opensearch.flowframework.common.CommonValue.RE_INDEX_FIELD;
import static org.opensearch.flowframework.common.CommonValue.SOURCE_INDEX;
import static org.opensearch.flowframework.common.CommonValue.SUCCESS;
import static org.opensearch.flowframework.common.CommonValue.TOOLS_FIELD;
import static org.opensearch.flowframework.common.CommonValue.TYPE;
Expand Down Expand Up @@ -84,6 +87,7 @@ public WorkflowStepFactory(
) {
stepMap.put(NoOpStep.NAME, NoOpStep::new);
stepMap.put(CreateIndexStep.NAME, () -> new CreateIndexStep(client, flowFrameworkIndicesHandler));
stepMap.put(ReIndexStep.NAME, () -> new ReIndexStep(client, flowFrameworkIndicesHandler));
stepMap.put(
RegisterLocalCustomModelStep.NAME,
() -> new RegisterLocalCustomModelStep(threadPool, mlClient, flowFrameworkIndicesHandler, flowFrameworkSettings)
Expand Down Expand Up @@ -125,6 +129,9 @@ public enum WorkflowSteps {
/** Create Index Step */
CREATE_INDEX(CreateIndexStep.NAME, List.of(INDEX_NAME, CONFIGURATIONS), List.of(INDEX_NAME), Collections.emptyList(), null),

/** Create ReIndex Step */
RE_INDEX(ReIndexStep.NAME, List.of(SOURCE_INDEX, DESTINATION_INDEX), List.of(RE_INDEX_FIELD), Collections.emptyList(), null),

/** Create Connector Step */
CREATE_CONNECTOR(
CreateConnectorStep.NAME,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ public void testParseWorkflowValidator() throws IOException {

WorkflowValidator validator = new WorkflowValidator(workflowStepValidators);

assertEquals(17, validator.getWorkflowStepValidators().size());
assertEquals(18, validator.getWorkflowStepValidators().size());

assertTrue(validator.getWorkflowStepValidators().keySet().contains("create_connector"));
assertEquals(7, validator.getWorkflowStepValidators().get("create_connector").getInputs().size());
Expand Down
Loading
Loading