Skip to content

Commit

Permalink
adding workflow state and create connector resources created
Browse files Browse the repository at this point in the history
Signed-off-by: Amit Galitzky <[email protected]>
  • Loading branch information
amitgalitz committed Nov 6, 2023
1 parent ac76a44 commit da29f84
Show file tree
Hide file tree
Showing 27 changed files with 708 additions and 68 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,12 @@
import org.opensearch.env.NodeEnvironment;
import org.opensearch.flowframework.indices.FlowFrameworkIndicesHandler;
import org.opensearch.flowframework.rest.RestCreateWorkflowAction;
import org.opensearch.flowframework.rest.RestGetWorkflowAction;
import org.opensearch.flowframework.rest.RestProvisionWorkflowAction;
import org.opensearch.flowframework.transport.CreateWorkflowAction;
import org.opensearch.flowframework.transport.CreateWorkflowTransportAction;
import org.opensearch.flowframework.transport.GetWorkflowAction;
import org.opensearch.flowframework.transport.GetWorkflowTransportAction;
import org.opensearch.flowframework.transport.ProvisionWorkflowAction;
import org.opensearch.flowframework.transport.ProvisionWorkflowTransportAction;
import org.opensearch.flowframework.workflow.WorkflowProcessSorter;
Expand Down Expand Up @@ -77,10 +80,9 @@ public Collection<Object> createComponents(
Supplier<RepositoriesService> repositoriesServiceSupplier
) {
MachineLearningNodeClient mlClient = new MachineLearningNodeClient(client);
WorkflowStepFactory workflowStepFactory = new WorkflowStepFactory(clusterService, client, mlClient);
WorkflowProcessSorter workflowProcessSorter = new WorkflowProcessSorter(workflowStepFactory, threadPool);

FlowFrameworkIndicesHandler flowFrameworkIndicesHandler = new FlowFrameworkIndicesHandler(client, clusterService);
WorkflowStepFactory workflowStepFactory = new WorkflowStepFactory(clusterService, client, mlClient, flowFrameworkIndicesHandler);
WorkflowProcessSorter workflowProcessSorter = new WorkflowProcessSorter(workflowStepFactory, threadPool);

return ImmutableList.of(workflowStepFactory, workflowProcessSorter, flowFrameworkIndicesHandler);
}
Expand All @@ -95,14 +97,15 @@ public List<RestHandler> getRestHandlers(
IndexNameExpressionResolver indexNameExpressionResolver,
Supplier<DiscoveryNodes> nodesInCluster
) {
return ImmutableList.of(new RestCreateWorkflowAction(), new RestProvisionWorkflowAction());
return ImmutableList.of(new RestCreateWorkflowAction(), new RestProvisionWorkflowAction(), new RestGetWorkflowAction());
}

@Override
public List<ActionHandler<? extends ActionRequest, ? extends ActionResponse>> getActions() {
return ImmutableList.of(
new ActionHandler<>(CreateWorkflowAction.INSTANCE, CreateWorkflowTransportAction.class),
new ActionHandler<>(ProvisionWorkflowAction.INSTANCE, ProvisionWorkflowTransportAction.class)
new ActionHandler<>(ProvisionWorkflowAction.INSTANCE, ProvisionWorkflowTransportAction.class),
new ActionHandler<>(GetWorkflowAction.INSTANCE, GetWorkflowTransportAction.class)
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ private CommonValue() {}
public static final String USER_FIELD = "user";

/** The transport action name prefix */
public static final String TRANSPORT_ACION_NAME_PREFIX = "cluster:admin/opensearch/flow_framework/";
public static final String TRANSPORT_ACTION_NAME_PREFIX = "cluster:admin/opensearch/flow_framework/";
/** The base URI for this plugin's rest actions */
public static final String FLOW_FRAMEWORK_BASE_URI = "/_plugins/_flow_framework";
/** The URI for this plugin's workflow rest actions */
Expand Down Expand Up @@ -130,4 +130,9 @@ private CommonValue() {}
public static final String USER_OUTPUTS_FIELD = "user_outputs";
/** The template field name for template resources created */
public static final String RESOURCES_CREATED_FIELD = "resources_created";
/** The field name for the ResourcesCreated's resource ID */
public static final String RESOURCE_ID_FIELD = "resource_id";
/** The field name for the ResourcesCreated's resource name */
public static final String RESOURCE_NAME_FIELD = "resource_type";

}
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import org.opensearch.flowframework.model.State;
import org.opensearch.flowframework.model.Template;
import org.opensearch.flowframework.model.WorkflowState;
import org.opensearch.script.Script;

import java.io.IOException;
import java.net.URL;
Expand Down Expand Up @@ -292,7 +293,7 @@ public void putInitialStateToWorkflowState(String workflowId, User user, ActionL
.state(State.NOT_STARTED.name())
.provisioningProgress(ProvisioningProgress.NOT_STARTED.name())
.user(user)
.resourcesCreated(Collections.emptyMap())
.resourcesCreated(Collections.emptyList())
.userOutputs(Collections.emptyMap())
.build();
initWorkflowStateIndexIfAbsent(ActionListener.wrap(indexCreated -> {
Expand Down Expand Up @@ -381,4 +382,37 @@ public void updateFlowFrameworkSystemIndexDoc(
}
}
}

/**
* Updates a document in the workflow state index
* @param indexName the index that we will be updating a document of.
* @param documentId the document ID
* @param script the given script to update doc
* @param listener action listener
*/
public void updateFlowFrameworkSystemIndexDocWithScript(
String indexName,
String documentId,
Script script,
ActionListener<UpdateResponse> listener
) {
if (!doesIndexExist(indexName)) {
String exceptionMessage = "Failed to update document for given workflow due to missing " + indexName + " index";
logger.error(exceptionMessage);
listener.onFailure(new Exception(exceptionMessage));
} else {
try (ThreadContext.StoredContext context = client.threadPool().getThreadContext().stashContext()) {
UpdateRequest updateRequest = new UpdateRequest(indexName, documentId);
// TODO: Also add ability to change other fields at the same time when adding detailed provision progress
updateRequest.script(script);
updateRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
// TODO: decide what condition can be considered as an update conflict and add retry strategy
client.update(updateRequest, ActionListener.runBefore(listener, () -> context.restore()));
} catch (Exception e) {
logger.error("Failed to update {} entry : {}. {}", indexName, documentId, e.getMessage());
listener.onFailure(e);
}
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -18,5 +18,7 @@ public enum ProvisioningProgress {
/** In Progress State */
IN_PROGRESS,
/** Done State */
DONE
DONE,
/** Failed State */
FAILED
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/
package org.opensearch.flowframework.model;

import org.opensearch.core.common.io.stream.StreamInput;
import org.opensearch.core.common.io.stream.StreamOutput;
import org.opensearch.core.common.io.stream.Writeable;
import org.opensearch.core.xcontent.ToXContentObject;
import org.opensearch.core.xcontent.XContentBuilder;
import org.opensearch.core.xcontent.XContentParser;

import java.io.IOException;

import static org.opensearch.core.xcontent.XContentParserUtils.ensureExpectedToken;
import static org.opensearch.flowframework.common.CommonValue.RESOURCE_ID_FIELD;
import static org.opensearch.flowframework.common.CommonValue.RESOURCE_NAME_FIELD;

/**
* This represents an object in the WorkflowState {@link WorkflowState}.
*/
public class ResourcesCreated implements ToXContentObject, Writeable {

private String resourceName;
private String resourceId;

/**
* Create this resources created object with given resource name and ID.
* @param resourceName The resource name associating to the step name where it was created
* @param resourceId The resources ID for relating to the created resource
*/
public ResourcesCreated(String resourceName, String resourceId) {
this.resourceName = resourceName;
this.resourceId = resourceId;
}

/**
* Create this resources created object with an StreamInput
* @param input the input stream to read from
* @throws IOException if failed to read input stream
*/
public ResourcesCreated(StreamInput input) throws IOException {
this.resourceName = input.readString();
this.resourceId = input.readString();
}

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
XContentBuilder xContentBuilder = builder.startObject()
.field(RESOURCE_NAME_FIELD, resourceName)
.field(RESOURCE_ID_FIELD, resourceId);
return xContentBuilder.endObject();
}

@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeString(resourceName);
out.writeString(resourceId);
}

/**
* Parse raw JSON content into a resourcesCreated instance.
*
* @param parser JSON based content parser
* @return the parsed ResourcesCreated instance
* @throws IOException if content can't be parsed correctly
*/
public static ResourcesCreated parse(XContentParser parser) throws IOException {
String resourceName = null;
String resourceId = null;

ensureExpectedToken(XContentParser.Token.START_OBJECT, parser.currentToken(), parser);
while (parser.nextToken() != XContentParser.Token.END_OBJECT) {
String fieldName = parser.currentName();
parser.nextToken();

switch (fieldName) {
case RESOURCE_NAME_FIELD:
resourceName = parser.text();
break;
case RESOURCE_ID_FIELD:
resourceId = parser.text();
break;
default:
throw new IOException("Unable to parse field [" + fieldName + "] in a resources_created object.");
}
}
return new ResourcesCreated(resourceName, resourceId);
}

}
21 changes: 20 additions & 1 deletion src/main/java/org/opensearch/flowframework/model/Template.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@
import org.opensearch.common.xcontent.json.JsonXContent;
import org.opensearch.common.xcontent.yaml.YamlXContent;
import org.opensearch.commons.authuser.User;
import org.opensearch.core.common.io.stream.StreamOutput;
import org.opensearch.core.common.io.stream.Writeable;
import org.opensearch.core.xcontent.NamedXContentRegistry;
import org.opensearch.core.xcontent.ToXContentObject;
import org.opensearch.core.xcontent.XContentBuilder;
Expand All @@ -38,7 +40,7 @@
/**
* The Template is the central data structure which configures workflows. This object is used to parse JSON communicated via REST API.
*/
public class Template implements ToXContentObject {
public class Template implements ToXContentObject, Writeable {

private final String name;
private final String description;
Expand Down Expand Up @@ -111,6 +113,23 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
return xContentBuilder.endObject();
}

// TODO: fix writeable when implementing get workflow API
@Override
public void writeTo(StreamOutput output) throws IOException {
output.writeString(name);
output.writeOptionalString(description);
output.writeString(useCase);
output.writeVersion(templateVersion);
// output.writeList((List<? extends Writeable>) compatibilityVersion);
output.writeMapWithConsistentOrder(workflows);
if (user != null) {
output.writeBoolean(true); // user exists
user.writeTo(output);
} else {
output.writeBoolean(false); // user does not exist
}
}

/**
* Parse raw json content into a Template instance.
*
Expand Down
Loading

0 comments on commit da29f84

Please sign in to comment.