Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added Workflow Step for ReIndex #660

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -94,28 +94,14 @@ private CommonValue() {}
public static final String SUCCESS = "success";
/** Type field */
public static final String TYPE = "type";
/** default_mapping_option filed */
public static final String DEFAULT_MAPPING_OPTION = "default_mapping_option";
/** ID Field */
public static final String ID = "id";
/** Processors field */
public static final String PROCESSORS = "processors";
/** Field map field */
public static final String FIELD_MAP = "field_map";
/** Input Field Name field */
public static final String INPUT_FIELD_NAME = "input_field_name";
/** Output Field Name field */
public static final String OUTPUT_FIELD_NAME = "output_field_name";
/** Task Id field */
public static final String TASK_ID = "task_id";
/** Register Model Status field */
public static final String REGISTER_MODEL_STATUS = "register_model_status";
/** Function Name field */
public static final String FUNCTION_NAME = "function_name";
/** Name field */
public static final String NAME_FIELD = "name";
/** Model Version field */
public static final String MODEL_VERSION = "model_version";
/** Model Group Id field */
public static final String MODEL_GROUP_STATUS = "model_group_status";
/** Description field */
Expand All @@ -128,8 +114,6 @@ private CommonValue() {}
public static final String MODEL_CONTENT_HASH_VALUE = "model_content_hash_value";
/** URL field */
public static final String URL = "url";
/** Model config field */
public static final String MODEL_CONFIG = "model_config";
/** Model type field */
public static final String MODEL_TYPE = "model_type";
/** Embedding dimension field */
Expand Down Expand Up @@ -175,6 +159,12 @@ private CommonValue() {}
/** Model Interface Field */
public static final String INTERFACE_FIELD = "interface";

public static final String RE_INDEX_FIELD = "reindex";
/** The source index field for reindex */
public static final String SOURCE_INDEX = "source_index";
/** The destination index field for reindex */
public static final String DESTINATION_INDEX = "destination_index";

/*
* Constants associated with resource provisioning / state
*/
Expand Down
148 changes: 148 additions & 0 deletions src/main/java/org/opensearch/flowframework/workflow/ReIndexStep.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,148 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/
package org.opensearch.flowframework.workflow;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.ExceptionsHelper;
import org.opensearch.action.support.PlainActionFuture;
import org.opensearch.client.Client;
import org.opensearch.core.action.ActionListener;
import org.opensearch.flowframework.exception.FlowFrameworkException;
import org.opensearch.flowframework.exception.WorkflowStepException;
import org.opensearch.flowframework.indices.FlowFrameworkIndicesHandler;
import org.opensearch.flowframework.util.ParseUtils;
import org.opensearch.index.reindex.BulkByScrollResponse;
import org.opensearch.index.reindex.ReindexAction;
import org.opensearch.index.reindex.ReindexRequest;

import java.util.Collections;
import java.util.Map;
import java.util.Set;

import static org.opensearch.flowframework.common.CommonValue.DESTINATION_INDEX;
import static org.opensearch.flowframework.common.CommonValue.RE_INDEX_FIELD;
import static org.opensearch.flowframework.common.CommonValue.SOURCE_INDEX;

public class ReIndexStep implements WorkflowStep {

private static final Logger logger = LogManager.getLogger(ReIndexStep.class);
private final Client client;
private final FlowFrameworkIndicesHandler flowFrameworkIndicesHandler;

/** The name of this step, used as a key in the template and the {@link WorkflowStepFactory} */
public static final String NAME = "reindex";

/**
* Instantiate this class
*
* @param client Client to create an index
* @param flowFrameworkIndicesHandler FlowFrameworkIndicesHandler class to update system indices
*/
public ReIndexStep(Client client, FlowFrameworkIndicesHandler flowFrameworkIndicesHandler) {
this.client = client;
this.flowFrameworkIndicesHandler = flowFrameworkIndicesHandler;
}

@Override
public PlainActionFuture<WorkflowData> execute(
String currentNodeId,
WorkflowData currentNodeInputs,
Map<String, WorkflowData> outputs,
Map<String, String> previousNodeInputs,
Map<String, String> params
) {

PlainActionFuture<WorkflowData> reIndexFuture = PlainActionFuture.newFuture();

Set<String> requiredKeys = Set.of(SOURCE_INDEX, DESTINATION_INDEX);

Set<String> optionalKeys = Collections.emptySet();

try {
Map<String, Object> inputs = ParseUtils.getInputsFromPreviousSteps(
requiredKeys,
optionalKeys,
currentNodeInputs,
outputs,
previousNodeInputs,
params
);

String sourceIndices = (String) inputs.get(SOURCE_INDEX);
String destinationIndex = (String) inputs.get(DESTINATION_INDEX);

ReindexRequest reindexRequest = new ReindexRequest();
reindexRequest.setSourceIndices(sourceIndices);
reindexRequest.setDestIndex(destinationIndex);
reindexRequest.setRefresh(true);

ActionListener<BulkByScrollResponse> actionListener = new ActionListener<>() {

@Override
public void onResponse(BulkByScrollResponse bulkByScrollResponse) {
logger.info("Reindex from source: {} to destination {}", sourceIndices, destinationIndex);
try {
if (bulkByScrollResponse.getBulkFailures().isEmpty() && bulkByScrollResponse.getSearchFailures().isEmpty()) {
flowFrameworkIndicesHandler.updateResourceInStateIndex(
currentNodeInputs.getWorkflowId(),
currentNodeId,
getName(),
destinationIndex,
ActionListener.wrap(response -> {
logger.info("successfully updated resource created in state index: {}", response.getIndex());

reIndexFuture.onResponse(
new WorkflowData(
Map.of(RE_INDEX_FIELD, Map.of(sourceIndices, destinationIndex)),
currentNodeInputs.getWorkflowId(),
currentNodeInputs.getNodeId()
)
);
}, exception -> {
String errorMessage = "Failed to update new reindexed"
+ currentNodeId
+ " resource "
+ getName()
+ " id "
+ destinationIndex;
logger.error(errorMessage, exception);
reIndexFuture.onFailure(new FlowFrameworkException(errorMessage, ExceptionsHelper.status(exception)));
})
);
}
} catch (Exception e) {
String errorMessage = "Failed to parse and update new created resource";
logger.error(errorMessage, e);
reIndexFuture.onFailure(new FlowFrameworkException(errorMessage, ExceptionsHelper.status(e)));
}
}

@Override
public void onFailure(Exception e) {
String errorMessage = "Failed to reindex from source" + sourceIndices + "to" + destinationIndex;
logger.error(errorMessage, e);
reIndexFuture.onFailure(new WorkflowStepException(errorMessage, ExceptionsHelper.status(e)));
}
};

client.execute(ReindexAction.INSTANCE, reindexRequest, actionListener);

} catch (Exception e) {
reIndexFuture.onFailure(e);
}

return reIndexFuture;
}

@Override
public String getName() {
return null;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import static org.opensearch.flowframework.common.CommonValue.CONFIGURATIONS;
import static org.opensearch.flowframework.common.CommonValue.CREDENTIAL_FIELD;
import static org.opensearch.flowframework.common.CommonValue.DESCRIPTION_FIELD;
import static org.opensearch.flowframework.common.CommonValue.DESTINATION_INDEX;
import static org.opensearch.flowframework.common.CommonValue.EMBEDDING_DIMENSION;
import static org.opensearch.flowframework.common.CommonValue.FRAMEWORK_TYPE;
import static org.opensearch.flowframework.common.CommonValue.FUNCTION_NAME;
Expand All @@ -47,6 +48,8 @@
import static org.opensearch.flowframework.common.CommonValue.PIPELINE_ID;
import static org.opensearch.flowframework.common.CommonValue.PROTOCOL_FIELD;
import static org.opensearch.flowframework.common.CommonValue.REGISTER_MODEL_STATUS;
import static org.opensearch.flowframework.common.CommonValue.RE_INDEX_FIELD;
import static org.opensearch.flowframework.common.CommonValue.SOURCE_INDEX;
import static org.opensearch.flowframework.common.CommonValue.SUCCESS;
import static org.opensearch.flowframework.common.CommonValue.TOOLS_FIELD;
import static org.opensearch.flowframework.common.CommonValue.TYPE;
Expand Down Expand Up @@ -84,6 +87,7 @@ public WorkflowStepFactory(
) {
stepMap.put(NoOpStep.NAME, NoOpStep::new);
stepMap.put(CreateIndexStep.NAME, () -> new CreateIndexStep(client, flowFrameworkIndicesHandler));
stepMap.put(ReIndexStep.NAME, () -> new CreateIndexStep(client, flowFrameworkIndicesHandler));
stepMap.put(
RegisterLocalCustomModelStep.NAME,
() -> new RegisterLocalCustomModelStep(threadPool, mlClient, flowFrameworkIndicesHandler, flowFrameworkSettings)
Expand Down Expand Up @@ -125,6 +129,9 @@ public enum WorkflowSteps {
/** Create Index Step */
CREATE_INDEX(CreateIndexStep.NAME, List.of(INDEX_NAME, CONFIGURATIONS), List.of(INDEX_NAME), Collections.emptyList(), null),

/** Create ReIndex Step */
RE_INDEX(ReIndexStep.NAME, List.of(SOURCE_INDEX, DESTINATION_INDEX), List.of(RE_INDEX_FIELD), Collections.emptyList(), null),

/** Create Connector Step */
CREATE_CONNECTOR(
CreateConnectorStep.NAME,
Expand Down
Loading