Skip to content

Commit

Permalink
Adding a state index (#110)
Browse files Browse the repository at this point in the history
* adding state index initial

Signed-off-by: Amit Galitzky <[email protected]>

* addressed comments and added more fields to state index

Signed-off-by: Amit Galitzky <[email protected]>

* addressed comments and fixed some unit tests

Signed-off-by: Amit Galitzky <[email protected]>

* moved variables to common value and adressed other comments

Signed-off-by: Amit Galitzky <[email protected]>

---------

Signed-off-by: Amit Galitzky <[email protected]>
  • Loading branch information
amitgalitz committed Oct 30, 2023
1 parent bcd53e1 commit 4106cba
Show file tree
Hide file tree
Showing 30 changed files with 1,531 additions and 608 deletions.
2 changes: 2 additions & 0 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ buildscript {
opensearch_group = "org.opensearch"
opensearch_no_snapshot = opensearch_build.replace("-SNAPSHOT","")
System.setProperty('tests.security.manager', 'false')
common_utils_version = System.getProperty("common_utils.version", opensearch_build)
}

repositories {
Expand Down Expand Up @@ -135,6 +136,7 @@ dependencies {
implementation 'org.junit.jupiter:junit-jupiter:5.10.0'
implementation "com.google.guava:guava:32.1.3-jre"
api group: 'org.opensearch', name:'opensearch-ml-client', version: "${opensearch_build}"
implementation "org.opensearch:common-utils:${common_utils_version}"

configurations.all {
resolutionStrategy {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,13 @@
import org.opensearch.core.xcontent.NamedXContentRegistry;
import org.opensearch.env.Environment;
import org.opensearch.env.NodeEnvironment;
import org.opensearch.flowframework.indices.GlobalContextHandler;
import org.opensearch.flowframework.indices.FlowFrameworkIndicesHandler;
import org.opensearch.flowframework.rest.RestCreateWorkflowAction;
import org.opensearch.flowframework.rest.RestProvisionWorkflowAction;
import org.opensearch.flowframework.transport.CreateWorkflowAction;
import org.opensearch.flowframework.transport.CreateWorkflowTransportAction;
import org.opensearch.flowframework.transport.ProvisionWorkflowAction;
import org.opensearch.flowframework.transport.ProvisionWorkflowTransportAction;
import org.opensearch.flowframework.workflow.CreateIndexStep;
import org.opensearch.flowframework.workflow.WorkflowProcessSorter;
import org.opensearch.flowframework.workflow.WorkflowStepFactory;
import org.opensearch.ml.client.MachineLearningNodeClient;
Expand Down Expand Up @@ -81,10 +80,9 @@ public Collection<Object> createComponents(
WorkflowStepFactory workflowStepFactory = new WorkflowStepFactory(clusterService, client, mlClient);
WorkflowProcessSorter workflowProcessSorter = new WorkflowProcessSorter(workflowStepFactory, threadPool);

// TODO : Refactor, move system index creation/associated methods outside of the CreateIndexStep
GlobalContextHandler globalContextHandler = new GlobalContextHandler(client, new CreateIndexStep(clusterService, client));
FlowFrameworkIndicesHandler flowFrameworkIndicesHandler = new FlowFrameworkIndicesHandler(client, clusterService);

return ImmutableList.of(workflowStepFactory, workflowProcessSorter, globalContextHandler);
return ImmutableList.of(workflowStepFactory, workflowProcessSorter, flowFrameworkIndicesHandler);
}

@Override
Expand Down
28 changes: 28 additions & 0 deletions src/main/java/org/opensearch/flowframework/common/CommonValue.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,13 @@ private CommonValue() {}
public static final String GLOBAL_CONTEXT_INDEX_MAPPING = "mappings/global-context.json";
/** Global Context index mapping version */
public static final Integer GLOBAL_CONTEXT_INDEX_VERSION = 1;
/** Workflow State Index Name */
public static final String WORKFLOW_STATE_INDEX = ".plugins-workflow-state";
/** Workflow State index mapping file path */
public static final String WORKFLOW_STATE_INDEX_MAPPING = "mappings/workflow-state.json";
/** Workflow State index mapping version */
public static final Integer WORKFLOW_STATE_INDEX_VERSION = 1;

/** The template field name for template use case */
public static final String USE_CASE_FIELD = "use_case";
/** The template field name for template version */
Expand All @@ -35,6 +42,8 @@ private CommonValue() {}
public static final String COMPATIBILITY_FIELD = "compatibility";
/** The template field name for template workflows */
public static final String WORKFLOWS_FIELD = "workflows";
/** The template field name for the user who created the workflow **/
public static final String USER_FIELD = "user";

/** The transport action name prefix */
public static final String TRANSPORT_ACION_NAME_PREFIX = "cluster:admin/opensearch/flow_framework/";
Expand Down Expand Up @@ -86,4 +95,23 @@ private CommonValue() {}
public static final String MODEL_ACCESS_MODE = "access_mode";
/** Add all backend roles */
public static final String ADD_ALL_BACKEND_ROLES = "add_all_backend_roles";

/** The template field name for the associated workflowID **/
public static final String WORKFLOW_ID_FIELD = "workflow_id";
/** The template field name for the workflow error **/
public static final String ERROR_FIELD = "error";
/** The template field name for the workflow state **/
public static final String STATE_FIELD = "state";
/** The template field name for the workflow provisioning progress **/
public static final String PROVISIONING_PROGRESS_FIELD = "provisioning_progress";
/** The template field name for the workflow provisioning start time **/
public static final String PROVISION_START_TIME_FIELD = "provision_start_time";
/** The template field name for the workflow provisioning end time **/
public static final String PROVISION_END_TIME_FIELD = "provision_end_time";
/** The template field name for the workflow ui metadata **/
public static final String UI_METADATA_FIELD = "ui_metadata";
/** The template field name for template user outputs */
public static final String USER_OUTPUTS_FIELD = "user_outputs";
/** The template field name for template resources created */
public static final String RESOURCES_CREATED_FIELD = "resources_created";
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@

import static org.opensearch.flowframework.common.CommonValue.GLOBAL_CONTEXT_INDEX;
import static org.opensearch.flowframework.common.CommonValue.GLOBAL_CONTEXT_INDEX_VERSION;
import static org.opensearch.flowframework.common.CommonValue.WORKFLOW_STATE_INDEX;
import static org.opensearch.flowframework.common.CommonValue.WORKFLOW_STATE_INDEX_VERSION;

/**
* An enumeration of Flow Framework indices
Expand All @@ -24,8 +26,13 @@ public enum FlowFrameworkIndex {
*/
GLOBAL_CONTEXT(
GLOBAL_CONTEXT_INDEX,
ThrowingSupplierWrapper.throwingSupplierWrapper(GlobalContextHandler::getGlobalContextMappings),
ThrowingSupplierWrapper.throwingSupplierWrapper(FlowFrameworkIndicesHandler::getGlobalContextMappings),
GLOBAL_CONTEXT_INDEX_VERSION
),
WORKFLOW_STATE(
WORKFLOW_STATE_INDEX,
ThrowingSupplierWrapper.throwingSupplierWrapper(FlowFrameworkIndicesHandler::getWorkflowStateMappings),
WORKFLOW_STATE_INDEX_VERSION
);

private final String indexName;
Expand Down
Loading

0 comments on commit 4106cba

Please sign in to comment.