Skip to content

Commit

Permalink
Initial commit for reindex step
Browse files Browse the repository at this point in the history
Signed-off-by: Owais Kazi <[email protected]>
  • Loading branch information
owaiskazi19 committed May 20, 2024
1 parent e3a8784 commit bdf614d
Show file tree
Hide file tree
Showing 3 changed files with 161 additions and 16 deletions.
22 changes: 6 additions & 16 deletions src/main/java/org/opensearch/flowframework/common/CommonValue.java
Original file line number Diff line number Diff line change
Expand Up @@ -94,28 +94,14 @@ private CommonValue() {}
public static final String SUCCESS = "success";
/** Type field */
public static final String TYPE = "type";
/** default_mapping_option filed */
public static final String DEFAULT_MAPPING_OPTION = "default_mapping_option";
/** ID Field */
public static final String ID = "id";
/** Processors field */
public static final String PROCESSORS = "processors";
/** Field map field */
public static final String FIELD_MAP = "field_map";
/** Input Field Name field */
public static final String INPUT_FIELD_NAME = "input_field_name";
/** Output Field Name field */
public static final String OUTPUT_FIELD_NAME = "output_field_name";
/** Task Id field */
public static final String TASK_ID = "task_id";
/** Register Model Status field */
public static final String REGISTER_MODEL_STATUS = "register_model_status";
/** Function Name field */
public static final String FUNCTION_NAME = "function_name";
/** Name field */
public static final String NAME_FIELD = "name";
/** Model Version field */
public static final String MODEL_VERSION = "model_version";
/** Model Group Id field */
public static final String MODEL_GROUP_STATUS = "model_group_status";
/** Description field */
Expand All @@ -128,8 +114,6 @@ private CommonValue() {}
public static final String MODEL_CONTENT_HASH_VALUE = "model_content_hash_value";
/** URL field */
public static final String URL = "url";
/** Model config field */
public static final String MODEL_CONFIG = "model_config";
/** Model type field */
public static final String MODEL_TYPE = "model_type";
/** Embedding dimension field */
Expand Down Expand Up @@ -175,6 +159,12 @@ private CommonValue() {}
/** Model Interface Field */
public static final String INTERFACE_FIELD = "interface";

public static final String RE_INDEX_FIELD = "reindex";
/** The source index field for reindex */
public static final String SOURCE_INDEX = "source_index";
/** The destination index field for reindex */
public static final String DESTINATION_INDEX = "destination_index";

/*
* Constants associated with resource provisioning / state
*/
Expand Down
148 changes: 148 additions & 0 deletions src/main/java/org/opensearch/flowframework/workflow/ReIndexStep.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,148 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/
package org.opensearch.flowframework.workflow;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.ExceptionsHelper;
import org.opensearch.action.support.PlainActionFuture;
import org.opensearch.client.Client;
import org.opensearch.core.action.ActionListener;
import org.opensearch.flowframework.exception.FlowFrameworkException;
import org.opensearch.flowframework.exception.WorkflowStepException;
import org.opensearch.flowframework.indices.FlowFrameworkIndicesHandler;
import org.opensearch.flowframework.util.ParseUtils;
import org.opensearch.index.reindex.BulkByScrollResponse;
import org.opensearch.index.reindex.ReindexAction;
import org.opensearch.index.reindex.ReindexRequest;

import java.util.Collections;
import java.util.Map;
import java.util.Set;

import static org.opensearch.flowframework.common.CommonValue.DESTINATION_INDEX;
import static org.opensearch.flowframework.common.CommonValue.RE_INDEX_FIELD;
import static org.opensearch.flowframework.common.CommonValue.SOURCE_INDEX;

public class ReIndexStep implements WorkflowStep {

private static final Logger logger = LogManager.getLogger(ReIndexStep.class);
private final Client client;
private final FlowFrameworkIndicesHandler flowFrameworkIndicesHandler;

/** The name of this step, used as a key in the template and the {@link WorkflowStepFactory} */
public static final String NAME = "reindex";

/**
* Instantiate this class
*
* @param client Client to create an index
* @param flowFrameworkIndicesHandler FlowFrameworkIndicesHandler class to update system indices
*/
public ReIndexStep(Client client, FlowFrameworkIndicesHandler flowFrameworkIndicesHandler) {
this.client = client;
this.flowFrameworkIndicesHandler = flowFrameworkIndicesHandler;
}

@Override
public PlainActionFuture<WorkflowData> execute(
String currentNodeId,
WorkflowData currentNodeInputs,
Map<String, WorkflowData> outputs,
Map<String, String> previousNodeInputs,
Map<String, String> params
) {

PlainActionFuture<WorkflowData> reIndexFuture = PlainActionFuture.newFuture();

Set<String> requiredKeys = Set.of(SOURCE_INDEX, DESTINATION_INDEX);

Set<String> optionalKeys = Collections.emptySet();

try {
Map<String, Object> inputs = ParseUtils.getInputsFromPreviousSteps(
requiredKeys,
optionalKeys,
currentNodeInputs,
outputs,
previousNodeInputs,
params
);

String sourceIndices = (String) inputs.get(SOURCE_INDEX);
String destinationIndex = (String) inputs.get(DESTINATION_INDEX);

ReindexRequest reindexRequest = new ReindexRequest();
reindexRequest.setSourceIndices(sourceIndices);
reindexRequest.setDestIndex(destinationIndex);
reindexRequest.setRefresh(true);

ActionListener<BulkByScrollResponse> actionListener = new ActionListener<>() {

@Override
public void onResponse(BulkByScrollResponse bulkByScrollResponse) {
logger.info("Reindex from source: {} to destination {}", sourceIndices, destinationIndex);
try {
if (bulkByScrollResponse.getBulkFailures().isEmpty() && bulkByScrollResponse.getSearchFailures().isEmpty()) {
flowFrameworkIndicesHandler.updateResourceInStateIndex(
currentNodeInputs.getWorkflowId(),
currentNodeId,
getName(),
destinationIndex,
ActionListener.wrap(response -> {
logger.info("successfully updated resource created in state index: {}", response.getIndex());

reIndexFuture.onResponse(
new WorkflowData(
Map.of(RE_INDEX_FIELD, Map.of(sourceIndices, destinationIndex)),
currentNodeInputs.getWorkflowId(),
currentNodeInputs.getNodeId()
)
);
}, exception -> {
String errorMessage = "Failed to update new reindexed"
+ currentNodeId
+ " resource "
+ getName()
+ " id "
+ destinationIndex;
logger.error(errorMessage, exception);
reIndexFuture.onFailure(new FlowFrameworkException(errorMessage, ExceptionsHelper.status(exception)));
})
);
}
} catch (Exception e) {
String errorMessage = "Failed to parse and update new created resource";
logger.error(errorMessage, e);
reIndexFuture.onFailure(new FlowFrameworkException(errorMessage, ExceptionsHelper.status(e)));
}
}

@Override
public void onFailure(Exception e) {
String errorMessage = "Failed to reindex from source" + sourceIndices + "to" + destinationIndex;
logger.error(errorMessage, e);
reIndexFuture.onFailure(new WorkflowStepException(errorMessage, ExceptionsHelper.status(e)));
}
};

client.execute(ReindexAction.INSTANCE, reindexRequest, actionListener);

} catch (Exception e) {
reIndexFuture.onFailure(e);
}

return reIndexFuture;
}

@Override
public String getName() {
return null;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import static org.opensearch.flowframework.common.CommonValue.CONFIGURATIONS;
import static org.opensearch.flowframework.common.CommonValue.CREDENTIAL_FIELD;
import static org.opensearch.flowframework.common.CommonValue.DESCRIPTION_FIELD;
import static org.opensearch.flowframework.common.CommonValue.DESTINATION_INDEX;
import static org.opensearch.flowframework.common.CommonValue.EMBEDDING_DIMENSION;
import static org.opensearch.flowframework.common.CommonValue.FRAMEWORK_TYPE;
import static org.opensearch.flowframework.common.CommonValue.FUNCTION_NAME;
Expand All @@ -47,6 +48,8 @@
import static org.opensearch.flowframework.common.CommonValue.PIPELINE_ID;
import static org.opensearch.flowframework.common.CommonValue.PROTOCOL_FIELD;
import static org.opensearch.flowframework.common.CommonValue.REGISTER_MODEL_STATUS;
import static org.opensearch.flowframework.common.CommonValue.RE_INDEX_FIELD;
import static org.opensearch.flowframework.common.CommonValue.SOURCE_INDEX;
import static org.opensearch.flowframework.common.CommonValue.SUCCESS;
import static org.opensearch.flowframework.common.CommonValue.TOOLS_FIELD;
import static org.opensearch.flowframework.common.CommonValue.TYPE;
Expand Down Expand Up @@ -84,6 +87,7 @@ public WorkflowStepFactory(
) {
stepMap.put(NoOpStep.NAME, NoOpStep::new);
stepMap.put(CreateIndexStep.NAME, () -> new CreateIndexStep(client, flowFrameworkIndicesHandler));
stepMap.put(ReIndexStep.NAME, () -> new CreateIndexStep(client, flowFrameworkIndicesHandler));
stepMap.put(
RegisterLocalCustomModelStep.NAME,
() -> new RegisterLocalCustomModelStep(threadPool, mlClient, flowFrameworkIndicesHandler, flowFrameworkSettings)
Expand Down Expand Up @@ -125,6 +129,9 @@ public enum WorkflowSteps {
/** Create Index Step */
CREATE_INDEX(CreateIndexStep.NAME, List.of(INDEX_NAME, CONFIGURATIONS), List.of(INDEX_NAME), Collections.emptyList(), null),

/** Create ReIndex Step */
RE_INDEX(ReIndexStep.NAME, List.of(SOURCE_INDEX, DESTINATION_INDEX), List.of(RE_INDEX_FIELD), Collections.emptyList(), null),

/** Create Connector Step */
CREATE_CONNECTOR(
CreateConnectorStep.NAME,
Expand Down

0 comments on commit bdf614d

Please sign in to comment.