Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adding a get status API #148

Merged
merged 7 commits into from
Nov 15, 2023
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,13 @@
import org.opensearch.flowframework.common.FlowFrameworkFeatureEnabledSetting;
import org.opensearch.flowframework.indices.FlowFrameworkIndicesHandler;
import org.opensearch.flowframework.rest.RestCreateWorkflowAction;
import org.opensearch.flowframework.rest.RestGetWorkflowAction;
import org.opensearch.flowframework.rest.RestProvisionWorkflowAction;
import org.opensearch.flowframework.rest.RestSearchWorkflowAction;
import org.opensearch.flowframework.transport.CreateWorkflowAction;
import org.opensearch.flowframework.transport.CreateWorkflowTransportAction;
import org.opensearch.flowframework.transport.GetWorkflowAction;
import org.opensearch.flowframework.transport.GetWorkflowTransportAction;
import org.opensearch.flowframework.transport.ProvisionWorkflowAction;
import org.opensearch.flowframework.transport.ProvisionWorkflowTransportAction;
import org.opensearch.flowframework.transport.SearchWorkflowAction;
Expand Down Expand Up @@ -92,10 +95,9 @@ public Collection<Object> createComponents(
flowFrameworkFeatureEnabledSetting = new FlowFrameworkFeatureEnabledSetting(clusterService, settings);

MachineLearningNodeClient mlClient = new MachineLearningNodeClient(client);
WorkflowStepFactory workflowStepFactory = new WorkflowStepFactory(clusterService, client, mlClient);
WorkflowProcessSorter workflowProcessSorter = new WorkflowProcessSorter(workflowStepFactory, threadPool);

FlowFrameworkIndicesHandler flowFrameworkIndicesHandler = new FlowFrameworkIndicesHandler(client, clusterService);
WorkflowStepFactory workflowStepFactory = new WorkflowStepFactory(clusterService, client, mlClient, flowFrameworkIndicesHandler);
WorkflowProcessSorter workflowProcessSorter = new WorkflowProcessSorter(workflowStepFactory, threadPool);

return ImmutableList.of(workflowStepFactory, workflowProcessSorter, flowFrameworkIndicesHandler);
}
Expand All @@ -113,7 +115,8 @@ public List<RestHandler> getRestHandlers(
return ImmutableList.of(
new RestCreateWorkflowAction(flowFrameworkFeatureEnabledSetting, settings, clusterService),
new RestProvisionWorkflowAction(flowFrameworkFeatureEnabledSetting),
new RestSearchWorkflowAction(flowFrameworkFeatureEnabledSetting)
new RestSearchWorkflowAction(flowFrameworkFeatureEnabledSetting),
new RestGetWorkflowAction(flowFrameworkFeatureEnabledSetting)
);
}

Expand All @@ -122,7 +125,8 @@ public List<RestHandler> getRestHandlers(
return ImmutableList.of(
new ActionHandler<>(CreateWorkflowAction.INSTANCE, CreateWorkflowTransportAction.class),
new ActionHandler<>(ProvisionWorkflowAction.INSTANCE, ProvisionWorkflowTransportAction.class),
new ActionHandler<>(SearchWorkflowAction.INSTANCE, SearchWorkflowTransportAction.class)
new ActionHandler<>(SearchWorkflowAction.INSTANCE, SearchWorkflowTransportAction.class),
new ActionHandler<>(GetWorkflowAction.INSTANCE, GetWorkflowTransportAction.class)
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ private CommonValue() {}
public static final String USER_FIELD = "user";

/** The transport action name prefix */
public static final String TRANSPORT_ACION_NAME_PREFIX = "cluster:admin/opensearch/flow_framework/";
public static final String TRANSPORT_ACTION_NAME_PREFIX = "cluster:admin/opensearch/flow_framework/";
/** The base URI for this plugin's rest actions */
public static final String FLOW_FRAMEWORK_BASE_URI = "/_plugins/_flow_framework";
/** The URI for this plugin's workflow rest actions */
Expand Down Expand Up @@ -148,4 +148,9 @@ private CommonValue() {}
public static final String USER_OUTPUTS_FIELD = "user_outputs";
/** The template field name for template resources created */
public static final String RESOURCES_CREATED_FIELD = "resources_created";
/** The field name for the ResourcesCreated's resource ID */
public static final String RESOURCE_ID_FIELD = "resource_id";
/** The field name for the ResourcesCreated's resource name */
public static final String WORKFLOW_STEP_NAME = "workflow_step_name";

}
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import org.opensearch.flowframework.model.State;
import org.opensearch.flowframework.model.Template;
import org.opensearch.flowframework.model.WorkflowState;
import org.opensearch.script.Script;

import java.io.IOException;
import java.net.URL;
Expand Down Expand Up @@ -311,7 +312,7 @@
.state(State.NOT_STARTED.name())
.provisioningProgress(ProvisioningProgress.NOT_STARTED.name())
.user(user)
.resourcesCreated(Collections.emptyMap())
.resourcesCreated(Collections.emptyList())

Check warning on line 315 in src/main/java/org/opensearch/flowframework/indices/FlowFrameworkIndicesHandler.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/flowframework/indices/FlowFrameworkIndicesHandler.java#L315

Added line #L315 was not covered by tests
.userOutputs(Collections.emptyMap())
.build();
initWorkflowStateIndexIfAbsent(ActionListener.wrap(indexCreated -> {
Expand All @@ -335,7 +336,7 @@
}

}, e -> {
String errorMessage = "Failed to create workflow_state index";

Check warning on line 339 in src/main/java/org/opensearch/flowframework/indices/FlowFrameworkIndicesHandler.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/flowframework/indices/FlowFrameworkIndicesHandler.java#L339

Added line #L339 was not covered by tests
logger.error(errorMessage, e);
listener.onFailure(new FlowFrameworkException(errorMessage + " : " + e.getMessage(), ExceptionsHelper.status(e)));
}));
Expand Down Expand Up @@ -402,4 +403,37 @@
}
}
}

/**
* Updates a document in the workflow state index
* @param indexName the index that we will be updating a document of.
* @param documentId the document ID
* @param script the given script to update doc
* @param listener action listener
*/
public void updateFlowFrameworkSystemIndexDocWithScript(
String indexName,
String documentId,
Script script,
ActionListener<UpdateResponse> listener
) {
if (!doesIndexExist(indexName)) {
String exceptionMessage = "Failed to update document for given workflow due to missing " + indexName + " index";
logger.error(exceptionMessage);
listener.onFailure(new Exception(exceptionMessage));
} else {
try (ThreadContext.StoredContext context = client.threadPool().getThreadContext().stashContext()) {
UpdateRequest updateRequest = new UpdateRequest(indexName, documentId);

Check warning on line 426 in src/main/java/org/opensearch/flowframework/indices/FlowFrameworkIndicesHandler.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/flowframework/indices/FlowFrameworkIndicesHandler.java#L421-L426

Added lines #L421 - L426 were not covered by tests
// TODO: Also add ability to change other fields at the same time when adding detailed provision progress
updateRequest.script(script);
updateRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);

Check warning on line 429 in src/main/java/org/opensearch/flowframework/indices/FlowFrameworkIndicesHandler.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/flowframework/indices/FlowFrameworkIndicesHandler.java#L428-L429

Added lines #L428 - L429 were not covered by tests
// TODO: decide what condition can be considered as an update conflict and add retry strategy
client.update(updateRequest, ActionListener.runBefore(listener, () -> context.restore()));
} catch (Exception e) {
logger.error("Failed to update {} entry : {}. {}", indexName, documentId, e.getMessage());
listener.onFailure(e);
}

Check warning on line 435 in src/main/java/org/opensearch/flowframework/indices/FlowFrameworkIndicesHandler.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/flowframework/indices/FlowFrameworkIndicesHandler.java#L431-L435

Added lines #L431 - L435 were not covered by tests
}
}

Check warning on line 437 in src/main/java/org/opensearch/flowframework/indices/FlowFrameworkIndicesHandler.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/flowframework/indices/FlowFrameworkIndicesHandler.java#L437

Added line #L437 was not covered by tests

}
Original file line number Diff line number Diff line change
Expand Up @@ -18,5 +18,7 @@ public enum ProvisioningProgress {
/** In Progress State */
IN_PROGRESS,
/** Done State */
DONE
DONE,
/** Failed State */
FAILED
}
122 changes: 122 additions & 0 deletions src/main/java/org/opensearch/flowframework/model/ResourcesCreated.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/
package org.opensearch.flowframework.model;

import org.opensearch.core.common.io.stream.StreamInput;
import org.opensearch.core.common.io.stream.StreamOutput;
import org.opensearch.core.common.io.stream.Writeable;
import org.opensearch.core.xcontent.ToXContentObject;
import org.opensearch.core.xcontent.XContentBuilder;
import org.opensearch.core.xcontent.XContentParser;

import java.io.IOException;

import static org.opensearch.core.xcontent.XContentParserUtils.ensureExpectedToken;
import static org.opensearch.flowframework.common.CommonValue.RESOURCE_ID_FIELD;
import static org.opensearch.flowframework.common.CommonValue.WORKFLOW_STEP_NAME;

/**
* This represents an object in the WorkflowState {@link WorkflowState}.
*/
public class ResourcesCreated implements ToXContentObject, Writeable {

private String workflowStepName;
private String resourceId;
amitgalitz marked this conversation as resolved.
Show resolved Hide resolved
amitgalitz marked this conversation as resolved.
Show resolved Hide resolved

/**
* Create this resources created object with given resource name and ID.
* @param workflowStepName The workflow step name associating to the step where it was created
* @param resourceId The resources ID for relating to the created resource
*/
public ResourcesCreated(String workflowStepName, String resourceId) {
this.workflowStepName = workflowStepName;
this.resourceId = resourceId;
}

/**
* Create this resources created object with an StreamInput
* @param input the input stream to read from
* @throws IOException if failed to read input stream
*/
public ResourcesCreated(StreamInput input) throws IOException {
this.workflowStepName = input.readString();
this.resourceId = input.readString();
}

Check warning on line 50 in src/main/java/org/opensearch/flowframework/model/ResourcesCreated.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/flowframework/model/ResourcesCreated.java#L47-L50

Added lines #L47 - L50 were not covered by tests

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
XContentBuilder xContentBuilder = builder.startObject()
.field(WORKFLOW_STEP_NAME, workflowStepName)
.field(RESOURCE_ID_FIELD, resourceId);
return xContentBuilder.endObject();
}

@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeString(workflowStepName);
out.writeString(resourceId);
}

Check warning on line 64 in src/main/java/org/opensearch/flowframework/model/ResourcesCreated.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/flowframework/model/ResourcesCreated.java#L62-L64

Added lines #L62 - L64 were not covered by tests

/**
* Gets the resource id.
*
* @return the resourceId.
*/
public String resourceId() {
return resourceId;
}

/**
* Gets the workflow step name associated to the created resource
*
* @return the workflowStepName.
*/
public String workflowStepName() {
return workflowStepName;
}

/**
* Parse raw JSON content into a resourcesCreated instance.
*
* @param parser JSON based content parser
* @return the parsed ResourcesCreated instance
* @throws IOException if content can't be parsed correctly
*/
public static ResourcesCreated parse(XContentParser parser) throws IOException {
String workflowStepName = null;
String resourceId = null;

ensureExpectedToken(XContentParser.Token.START_OBJECT, parser.currentToken(), parser);
while (parser.nextToken() != XContentParser.Token.END_OBJECT) {
String fieldName = parser.currentName();
parser.nextToken();

switch (fieldName) {
case WORKFLOW_STEP_NAME:
workflowStepName = parser.text();
break;
case RESOURCE_ID_FIELD:
resourceId = parser.text();
break;
default:
throw new IOException("Unable to parse field [" + fieldName + "] in a resources_created object.");
}
}
if (workflowStepName == null || resourceId == null) {
throw new IOException("A resourcesCreated object requires both a workflowStepName and resourceId.");
}
return new ResourcesCreated(workflowStepName, resourceId);
}

@Override
public String toString() {
return "resources_Created [resource_name=" + workflowStepName + ", id=" + resourceId + "]";

Check warning on line 119 in src/main/java/org/opensearch/flowframework/model/ResourcesCreated.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/flowframework/model/ResourcesCreated.java#L119

Added line #L119 was not covered by tests
}

amitgalitz marked this conversation as resolved.
Show resolved Hide resolved
}
21 changes: 20 additions & 1 deletion src/main/java/org/opensearch/flowframework/model/Template.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@
import org.opensearch.common.xcontent.json.JsonXContent;
import org.opensearch.common.xcontent.yaml.YamlXContent;
import org.opensearch.commons.authuser.User;
import org.opensearch.core.common.io.stream.StreamOutput;
import org.opensearch.core.common.io.stream.Writeable;
import org.opensearch.core.xcontent.NamedXContentRegistry;
import org.opensearch.core.xcontent.ToXContentObject;
import org.opensearch.core.xcontent.XContentBuilder;
Expand All @@ -39,7 +41,7 @@
/**
* The Template is the central data structure which configures workflows. This object is used to parse JSON communicated via REST API.
*/
public class Template implements ToXContentObject {
public class Template implements ToXContentObject, Writeable {

private String name;
private String description;
Expand Down Expand Up @@ -229,7 +231,7 @@
}

if (uiMetadata != null && !uiMetadata.isEmpty()) {
xContentBuilder.field(UI_METADATA_FIELD, uiMetadata);

Check warning on line 234 in src/main/java/org/opensearch/flowframework/model/Template.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/flowframework/model/Template.java#L234

Added line #L234 was not covered by tests
}

xContentBuilder.endObject();
Expand All @@ -240,6 +242,23 @@
return xContentBuilder.endObject();
}

// TODO: fix writeable when implementing get workflow API
@Override
public void writeTo(StreamOutput output) throws IOException {
output.writeString(name);
output.writeOptionalString(description);
output.writeString(useCase);
output.writeVersion(templateVersion);

Check warning on line 251 in src/main/java/org/opensearch/flowframework/model/Template.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/flowframework/model/Template.java#L248-L251

Added lines #L248 - L251 were not covered by tests
// output.writeList((List<? extends Writeable>) compatibilityVersion);
output.writeMapWithConsistentOrder(workflows);

Check warning on line 253 in src/main/java/org/opensearch/flowframework/model/Template.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/flowframework/model/Template.java#L253

Added line #L253 was not covered by tests
if (user != null) {
output.writeBoolean(true); // user exists
user.writeTo(output);

Check warning on line 256 in src/main/java/org/opensearch/flowframework/model/Template.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/flowframework/model/Template.java#L255-L256

Added lines #L255 - L256 were not covered by tests
} else {
output.writeBoolean(false); // user does not exist

Check warning on line 258 in src/main/java/org/opensearch/flowframework/model/Template.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/flowframework/model/Template.java#L258

Added line #L258 was not covered by tests
}
}

Check warning on line 260 in src/main/java/org/opensearch/flowframework/model/Template.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/flowframework/model/Template.java#L260

Added line #L260 was not covered by tests

/**
* Parse raw json content into a Template instance.
*
Expand Down Expand Up @@ -300,8 +319,8 @@
}
break;
case UI_METADATA_FIELD:
uiMetadata = parser.map();
break;

Check warning on line 323 in src/main/java/org/opensearch/flowframework/model/Template.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/flowframework/model/Template.java#L322-L323

Added lines #L322 - L323 were not covered by tests
case USER_FIELD:
user = User.parse(parser);
break;
Expand Down
Loading
Loading