Skip to content

Commit

Permalink
addressing comments
Browse files Browse the repository at this point in the history
Signed-off-by: Amit Galitzky <[email protected]>
  • Loading branch information
amitgalitz committed Nov 14, 2023
1 parent c5b7d72 commit 0d77699
Show file tree
Hide file tree
Showing 9 changed files with 201 additions and 63 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,6 @@ private CommonValue() {}
/** The field name for the ResourcesCreated's resource ID */
public static final String RESOURCE_ID_FIELD = "resource_id";
/** The field name for the ResourcesCreated's resource name */
public static final String RESOURCE_NAME_FIELD = "resource_type";
public static final String WORKFLOW_STEP_NAME = "workflow_step_name";

}
Original file line number Diff line number Diff line change
Expand Up @@ -19,23 +19,23 @@

import static org.opensearch.core.xcontent.XContentParserUtils.ensureExpectedToken;
import static org.opensearch.flowframework.common.CommonValue.RESOURCE_ID_FIELD;
import static org.opensearch.flowframework.common.CommonValue.RESOURCE_NAME_FIELD;
import static org.opensearch.flowframework.common.CommonValue.WORKFLOW_STEP_NAME;

/**
* This represents an object in the WorkflowState {@link WorkflowState}.
*/
public class ResourcesCreated implements ToXContentObject, Writeable {

private String resourceName;
private String workflowStepName;
private String resourceId;

/**
* Create this resources created object with given resource name and ID.
* @param resourceName The resource name associating to the step name where it was created
* @param workflowStepName The workflow step name associating to the step where it was created
* @param resourceId The resources ID for relating to the created resource
*/
public ResourcesCreated(String resourceName, String resourceId) {
this.resourceName = resourceName;
public ResourcesCreated(String workflowStepName, String resourceId) {
this.workflowStepName = workflowStepName;
this.resourceId = resourceId;
}

Expand All @@ -45,24 +45,42 @@ public ResourcesCreated(String resourceName, String resourceId) {
* @throws IOException if failed to read input stream
*/
public ResourcesCreated(StreamInput input) throws IOException {
this.resourceName = input.readString();
this.workflowStepName = input.readString();
this.resourceId = input.readString();
}

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
XContentBuilder xContentBuilder = builder.startObject()
.field(RESOURCE_NAME_FIELD, resourceName)
.field(WORKFLOW_STEP_NAME, workflowStepName)
.field(RESOURCE_ID_FIELD, resourceId);
return xContentBuilder.endObject();
}

@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeString(resourceName);
out.writeString(workflowStepName);
out.writeString(resourceId);
}

/**
* Gets the resource id.
*
* @return the resourceId.
*/
public String resourceId() {
return resourceId;
}

/**
* Gets the workflow step name associated to the created resource
*
* @return the workflowStepName.
*/
public String workflowStepName() {
return workflowStepName;
}

/**
* Parse raw JSON content into a resourcesCreated instance.
*
Expand All @@ -71,7 +89,7 @@ public void writeTo(StreamOutput out) throws IOException {
* @throws IOException if content can't be parsed correctly
*/
public static ResourcesCreated parse(XContentParser parser) throws IOException {
String resourceName = null;
String workflowStepName = null;
String resourceId = null;

ensureExpectedToken(XContentParser.Token.START_OBJECT, parser.currentToken(), parser);
Expand All @@ -80,8 +98,8 @@ public static ResourcesCreated parse(XContentParser parser) throws IOException {
parser.nextToken();

switch (fieldName) {
case RESOURCE_NAME_FIELD:
resourceName = parser.text();
case WORKFLOW_STEP_NAME:
workflowStepName = parser.text();
break;
case RESOURCE_ID_FIELD:
resourceId = parser.text();
Expand All @@ -90,7 +108,15 @@ public static ResourcesCreated parse(XContentParser parser) throws IOException {
throw new IOException("Unable to parse field [" + fieldName + "] in a resources_created object.");
}
}
return new ResourcesCreated(resourceName, resourceId);
if (workflowStepName == null || resourceId == null) {
throw new IOException("A resourcesCreated object requires both a workflowStepName and resourceId.");
}
return new ResourcesCreated(workflowStepName, resourceId);
}

@Override
public String toString() {
return "resources_Created [resource_name=" + workflowStepName + ", id=" + resourceId + "]";
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import com.google.common.collect.ImmutableList;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.ExceptionsHelper;
import org.opensearch.client.node.NodeClient;
import org.opensearch.core.action.ActionListener;
import org.opensearch.core.rest.RestStatus;
Expand All @@ -19,7 +20,7 @@
import org.opensearch.flowframework.common.FlowFrameworkFeatureEnabledSetting;
import org.opensearch.flowframework.exception.FlowFrameworkException;
import org.opensearch.flowframework.transport.GetWorkflowAction;
import org.opensearch.flowframework.transport.WorkflowRequest;
import org.opensearch.flowframework.transport.GetWorkflowRequest;
import org.opensearch.rest.BaseRestHandler;
import org.opensearch.rest.BytesRestResponse;
import org.opensearch.rest.RestRequest;
Expand Down Expand Up @@ -67,22 +68,22 @@ protected BaseRestHandler.RestChannelConsumer prepareRequest(RestRequest request

// Validate content
if (request.hasContent()) {
throw new FlowFrameworkException("Invalid request format", RestStatus.BAD_REQUEST);
throw new FlowFrameworkException("No request body present", RestStatus.BAD_REQUEST);
}
// Validate params
String workflowId = request.param(WORKFLOW_ID);
if (workflowId == null) {
throw new FlowFrameworkException("workflow_id cannot be null", RestStatus.BAD_REQUEST);
}

boolean all = request.paramAsBoolean("_all", false);
WorkflowRequest workflowRequest = new WorkflowRequest(workflowId, null, false, all);
return channel -> client.execute(GetWorkflowAction.INSTANCE, workflowRequest, ActionListener.wrap(response -> {
boolean all = request.paramAsBoolean("all", false);
GetWorkflowRequest getWorkflowRequest = new GetWorkflowRequest(workflowId, all);
return channel -> client.execute(GetWorkflowAction.INSTANCE, getWorkflowRequest, ActionListener.wrap(response -> {
XContentBuilder builder = response.toXContent(channel.newBuilder(), ToXContent.EMPTY_PARAMS);
channel.sendResponse(new BytesRestResponse(RestStatus.OK, builder));
}, exception -> {
try {
FlowFrameworkException ex = (FlowFrameworkException) exception;
FlowFrameworkException ex = new FlowFrameworkException(exception.getMessage(), ExceptionsHelper.status(exception));
XContentBuilder exceptionBuilder = ex.toXContent(channel.newErrorBuilder(), ToXContent.EMPTY_PARAMS);
channel.sendResponse(new BytesRestResponse(ex.getRestStatus(), exceptionBuilder));
} catch (IOException e) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/
package org.opensearch.flowframework.transport;

import org.opensearch.action.ActionRequest;
import org.opensearch.action.ActionRequestValidationException;
import org.opensearch.common.Nullable;
import org.opensearch.core.common.io.stream.StreamInput;
import org.opensearch.core.common.io.stream.StreamOutput;

import java.io.IOException;

/**
* Transport Request to get a workflow or workflow status
*/
public class GetWorkflowRequest extends ActionRequest {

/**
* The documentId of the workflow entry within the Global Context index
*/
@Nullable
private String workflowId;

/**
* The all parameter on the get request
*/
private boolean all;

/**
* Instantiates a new GetWorkflowRequest
* @param workflowId the documentId of the workflow
* @param all whether the get request is looking for all fields in status
*/
public GetWorkflowRequest(@Nullable String workflowId, boolean all) {
this.workflowId = workflowId;
this.all = all;
}

/**
* Instantiates a new GetWorkflowRequest request
* @param in The input stream to read from
* @throws IOException If the stream cannot be read properly
*/
public GetWorkflowRequest(StreamInput in) throws IOException {
super(in);
this.workflowId = in.readString();
this.all = in.readBoolean();
}

/**
* Gets the workflow Id of the request
* @return the workflow Id
*/
@Nullable
public String getWorkflowId() {
return this.workflowId;
}

/**
* Gets the value of the all parameter
* @return whether the all parameter was present or not in request
*/
public boolean getAll() {
return this.all;
}

@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeString(workflowId);
out.writeBoolean(all);
}

@Override
public ActionRequestValidationException validate() {
return null;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,10 @@
//TODO: Currently we only get the workflow status but we should change to be able to get the
// full template as well
/**
* Transport Action to get status of a current workflow
* Transport Action to get a specific workflow. Currently, we only support the action with _status
* in the API path but will add the ability to get the workflow and not just the status in the future
*/
public class GetWorkflowTransportAction extends HandledTransportAction<WorkflowRequest, GetWorkflowResponse> {
public class GetWorkflowTransportAction extends HandledTransportAction<GetWorkflowRequest, GetWorkflowResponse> {

private final Logger logger = LogManager.getLogger(GetWorkflowTransportAction.class);

Expand All @@ -57,13 +58,13 @@ public GetWorkflowTransportAction(
Client client,
NamedXContentRegistry xContentRegistry
) {
super(GetWorkflowAction.NAME, transportService, actionFilters, WorkflowRequest::new);
super(GetWorkflowAction.NAME, transportService, actionFilters, GetWorkflowRequest::new);
this.client = client;
this.xContentRegistry = xContentRegistry;
}

@Override
protected void doExecute(Task task, WorkflowRequest request, ActionListener<GetWorkflowResponse> listener) {
protected void doExecute(Task task, GetWorkflowRequest request, ActionListener<GetWorkflowResponse> listener) {
String workflowId = request.getWorkflowId();
User user = ParseUtils.getUserContext(client);
GetRequest getRequest = new GetRequest(WORKFLOW_STATE_INDEX).id(workflowId);
Expand All @@ -81,11 +82,11 @@ protected void doExecute(Task task, WorkflowRequest request, ActionListener<GetW
listener.onFailure(e);
}
} else {
listener.onFailure(new FlowFrameworkException("Fail to find workflow", RestStatus.BAD_REQUEST));
listener.onFailure(new FlowFrameworkException("Fail to find workflow", RestStatus.NOT_FOUND));
}
}, e -> {
if (e instanceof IndexNotFoundException) {
listener.onFailure(new FlowFrameworkException("Fail to find workflow", RestStatus.BAD_REQUEST));
listener.onFailure(new FlowFrameworkException("Fail to find workflow", RestStatus.NOT_FOUND));
} else {
logger.error("Failed to get workflow status of " + workflowId, e);
listener.onFailure(e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@

import java.time.Instant;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Locale;
import java.util.concurrent.CompletableFuture;
Expand All @@ -46,6 +47,7 @@
import static org.opensearch.flowframework.common.CommonValue.PROVISION_START_TIME_FIELD;
import static org.opensearch.flowframework.common.CommonValue.PROVISION_THREAD_POOL;
import static org.opensearch.flowframework.common.CommonValue.PROVISION_WORKFLOW;
import static org.opensearch.flowframework.common.CommonValue.RESOURCES_CREATED_FIELD;
import static org.opensearch.flowframework.common.CommonValue.STATE_FIELD;
import static org.opensearch.flowframework.common.CommonValue.WORKFLOW_STATE_INDEX;

Expand Down Expand Up @@ -125,7 +127,9 @@ protected void doExecute(Task task, WorkflowRequest request, ActionListener<Work
PROVISIONING_PROGRESS_FIELD,
ProvisioningProgress.IN_PROGRESS,
PROVISION_START_TIME_FIELD,
Instant.now().toEpochMilli()
Instant.now().toEpochMilli(),
RESOURCES_CREATED_FIELD,
Collections.emptyList()
),
ActionListener.wrap(updateResponse -> {
logger.info("updated workflow {} state to PROVISIONING", request.getWorkflowId());
Expand Down Expand Up @@ -233,7 +237,7 @@ private void executeWorkflow(List<ProcessNode> workflowSequence, ActionListener<
// Attempt to join each workflow step future, may throw a CompletionException if any step completes exceptionally
workflowFutureList.forEach(CompletableFuture::join);

workflowListener.onResponse("READY");
// workflowListener.onResponse("READY");

} catch (IllegalArgumentException e) {
workflowListener.onFailure(new FlowFrameworkException(e.getMessage(), RestStatus.BAD_REQUEST));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,36 +43,19 @@ public class WorkflowRequest extends ActionRequest {
* @param template the use case template which describes the workflow
*/
public WorkflowRequest(@Nullable String workflowId, @Nullable Template template) {
this(workflowId, template, false, false);
this(workflowId, template, false);
}

/**
* The all parameter on the get request
*/
private boolean all;

/**
* Instantiates a new WorkflowRequest
* @param workflowId the documentId of the workflow
* @param template the use case template which describes the workflow
* @param dryRun flag to indicate if validation is necessary
* @param all whether the get request is looking for all fields in status
*/
public WorkflowRequest(@Nullable String workflowId, @Nullable Template template, boolean dryRun, boolean all) {
public WorkflowRequest(@Nullable String workflowId, @Nullable Template template, boolean dryRun) {
this.workflowId = workflowId;
this.template = template;
this.dryRun = dryRun;
this.all = all;
}

/**
* Instantiates a new WorkflowRequest
* @param workflowId the documentId of the workflow
* @param template the use case template which describes the workflow
* @param all whether the get request is looking for all fields in status
*/
public WorkflowRequest(@Nullable String workflowId, @Nullable Template template, boolean all) {
this(workflowId, template, all, false);
}

/**
Expand Down Expand Up @@ -114,14 +97,6 @@ public boolean isDryRun() {
return this.dryRun;
}

/**
* Gets the value of the all parameter
* @return whether the all parameter was present or not in request
*/
public boolean getAll() {
return this.all;
}

@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
Expand Down
Loading

0 comments on commit 0d77699

Please sign in to comment.