Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Backport 2.x] Updates exception handling with FlowFrameworkException AND adds dryrun param to Create Workflow #144

Merged
merged 1 commit into from
Nov 3, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,8 @@ private CommonValue() {}
public static final String WORKFLOW_URI = FLOW_FRAMEWORK_BASE_URI + "/workflow";
/** Field name for workflow Id, the document Id of the indexed use case template */
public static final String WORKFLOW_ID = "workflow_id";
/** Field name for dry run, the flag to indicate if validation is necessary */
public static final String DRY_RUN = "dryrun";
/** The field name for provision workflow within a use case template*/
public static final String PROVISION_WORKFLOW = "provision";

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,15 @@
package org.opensearch.flowframework.exception;

import org.opensearch.core.rest.RestStatus;
import org.opensearch.core.xcontent.ToXContentObject;
import org.opensearch.core.xcontent.XContentBuilder;

import java.io.IOException;

/**
* Representation of Flow Framework Exceptions
*/
public class FlowFrameworkException extends RuntimeException {
public class FlowFrameworkException extends RuntimeException implements ToXContentObject {

private static final long serialVersionUID = 1L;

Expand Down Expand Up @@ -60,4 +64,9 @@ public FlowFrameworkException(String message, Throwable cause, RestStatus restSt
public RestStatus getRestStatus() {
return restStatus;
}

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
return builder.startObject().field("error", this.getMessage()).endObject();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import com.google.common.io.Resources;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.ExceptionsHelper;
import org.opensearch.action.admin.indices.create.CreateIndexRequest;
import org.opensearch.action.admin.indices.create.CreateIndexResponse;
import org.opensearch.action.admin.indices.mapping.put.PutMappingRequest;
Expand All @@ -29,6 +30,7 @@
import org.opensearch.common.xcontent.XContentType;
import org.opensearch.commons.authuser.User;
import org.opensearch.core.action.ActionListener;
import org.opensearch.core.rest.RestStatus;
import org.opensearch.core.xcontent.ToXContent;
import org.opensearch.core.xcontent.XContentBuilder;
import org.opensearch.flowframework.exception.FlowFrameworkException;
Expand Down Expand Up @@ -148,7 +150,7 @@
}
}, e -> {
logger.error("Failed to create index " + indexName, e);
internalListener.onFailure(e);
internalListener.onFailure(new FlowFrameworkException(e.getMessage(), ExceptionsHelper.status(e)));

Check warning on line 153 in src/main/java/org/opensearch/flowframework/indices/FlowFrameworkIndicesHandler.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/flowframework/indices/FlowFrameworkIndicesHandler.java#L153

Added line #L153 was not covered by tests
});
CreateIndexRequest request = new CreateIndexRequest(indexName).mapping(mapping).settings(indexSettings);
client.admin().indices().create(request, actionListener);
Expand Down Expand Up @@ -181,17 +183,29 @@
);
}
}, exception -> {
logger.error("Failed to update index setting for: " + indexName, exception);
internalListener.onFailure(exception);
String errorMessage = "Failed to update index setting for: " + indexName;
logger.error(errorMessage, exception);
internalListener.onFailure(

Check warning on line 188 in src/main/java/org/opensearch/flowframework/indices/FlowFrameworkIndicesHandler.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/flowframework/indices/FlowFrameworkIndicesHandler.java#L186-L188

Added lines #L186 - L188 were not covered by tests
new FlowFrameworkException(
errorMessage + " : " + exception.getMessage(),
ExceptionsHelper.status(exception)

Check warning on line 191 in src/main/java/org/opensearch/flowframework/indices/FlowFrameworkIndicesHandler.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/flowframework/indices/FlowFrameworkIndicesHandler.java#L190-L191

Added lines #L190 - L191 were not covered by tests
)
);
}));
} else {
internalListener.onFailure(
new FlowFrameworkException("Failed to update index: " + indexName, INTERNAL_SERVER_ERROR)
);
}
}, exception -> {
logger.error("Failed to update index " + indexName, exception);
internalListener.onFailure(exception);
String errorMessage = "Failed to update index " + indexName;
logger.error(errorMessage, exception);
internalListener.onFailure(

Check warning on line 203 in src/main/java/org/opensearch/flowframework/indices/FlowFrameworkIndicesHandler.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/flowframework/indices/FlowFrameworkIndicesHandler.java#L201-L203

Added lines #L201 - L203 were not covered by tests
new FlowFrameworkException(
errorMessage + " : " + exception.getMessage(),
ExceptionsHelper.status(exception)

Check warning on line 206 in src/main/java/org/opensearch/flowframework/indices/FlowFrameworkIndicesHandler.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/flowframework/indices/FlowFrameworkIndicesHandler.java#L205-L206

Added lines #L205 - L206 were not covered by tests
)
);
})
);
} else {
Expand All @@ -200,17 +214,21 @@
internalListener.onResponse(true);
}
}, e -> {
logger.error("Failed to update index mapping", e);
internalListener.onFailure(e);
String errorMessage = "Failed to update index mapping";
logger.error(errorMessage, e);
internalListener.onFailure(
new FlowFrameworkException(errorMessage + " : " + e.getMessage(), ExceptionsHelper.status(e))

Check warning on line 220 in src/main/java/org/opensearch/flowframework/indices/FlowFrameworkIndicesHandler.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/flowframework/indices/FlowFrameworkIndicesHandler.java#L217-L220

Added lines #L217 - L220 were not covered by tests
);
}));
} else {
// No need to update index if it's already updated.
internalListener.onResponse(true);
}
}
} catch (Exception e) {
logger.error("Failed to init index " + indexName, e);
listener.onFailure(e);
String errorMessage = "Failed to init index " + indexName;
logger.error(errorMessage, e);
listener.onFailure(new FlowFrameworkException(errorMessage + " : " + e.getMessage(), ExceptionsHelper.status(e)));

Check warning on line 231 in src/main/java/org/opensearch/flowframework/indices/FlowFrameworkIndicesHandler.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/flowframework/indices/FlowFrameworkIndicesHandler.java#L229-L231

Added lines #L229 - L231 were not covered by tests
}
}

Expand Down Expand Up @@ -272,8 +290,9 @@
.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
client.index(request, ActionListener.runBefore(listener, () -> context.restore()));
} catch (Exception e) {
logger.error("Failed to index global_context index");
listener.onFailure(e);
String errorMessage = "Failed to index global_context index";
logger.error(errorMessage);
listener.onFailure(new FlowFrameworkException(errorMessage + " : " + e.getMessage(), ExceptionsHelper.status(e)));

Check warning on line 295 in src/main/java/org/opensearch/flowframework/indices/FlowFrameworkIndicesHandler.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/flowframework/indices/FlowFrameworkIndicesHandler.java#L293-L295

Added lines #L293 - L295 were not covered by tests
}
}, e -> {
logger.error("Failed to create global_context index", e);
Expand Down Expand Up @@ -310,13 +329,15 @@
request.id(workflowId);
client.index(request, ActionListener.runBefore(listener, () -> context.restore()));
} catch (Exception e) {
logger.error("Failed to put state index document", e);
listener.onFailure(e);
String errorMessage = "Failed to put state index document";
logger.error(errorMessage, e);
listener.onFailure(new FlowFrameworkException(errorMessage + " : " + e.getMessage(), ExceptionsHelper.status(e)));

Check warning on line 334 in src/main/java/org/opensearch/flowframework/indices/FlowFrameworkIndicesHandler.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/flowframework/indices/FlowFrameworkIndicesHandler.java#L332-L334

Added lines #L332 - L334 were not covered by tests
}

}, e -> {
logger.error("Failed to create global_context index", e);
listener.onFailure(e);
String errorMessage = "Failed to create global_context index";
logger.error(errorMessage, e);
listener.onFailure(new FlowFrameworkException(errorMessage + " : " + e.getMessage(), ExceptionsHelper.status(e)));

Check warning on line 340 in src/main/java/org/opensearch/flowframework/indices/FlowFrameworkIndicesHandler.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/flowframework/indices/FlowFrameworkIndicesHandler.java#L338-L340

Added lines #L338 - L340 were not covered by tests
}));
}

Expand All @@ -332,7 +353,7 @@
+ documentId
+ ", global_context index does not exist.";
logger.error(exceptionMessage);
listener.onFailure(new Exception(exceptionMessage));
listener.onFailure(new FlowFrameworkException(exceptionMessage, RestStatus.BAD_REQUEST));
} else {
IndexRequest request = new IndexRequest(GLOBAL_CONTEXT_INDEX).id(documentId);
try (
Expand All @@ -343,8 +364,9 @@
.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
client.index(request, ActionListener.runBefore(listener, () -> context.restore()));
} catch (Exception e) {
logger.error("Failed to update global_context entry : {}. {}", documentId, e.getMessage());
listener.onFailure(e);
String errorMessage = "Failed to update global_context entry : " + documentId;
logger.error(errorMessage, e);
listener.onFailure(new FlowFrameworkException(errorMessage + " : " + e.getMessage(), ExceptionsHelper.status(e)));

Check warning on line 369 in src/main/java/org/opensearch/flowframework/indices/FlowFrameworkIndicesHandler.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/flowframework/indices/FlowFrameworkIndicesHandler.java#L367-L369

Added lines #L367 - L369 were not covered by tests
}
}
}
Expand All @@ -365,7 +387,7 @@
if (!doesIndexExist(indexName)) {
String exceptionMessage = "Failed to update document for given workflow due to missing " + indexName + " index";
logger.error(exceptionMessage);
listener.onFailure(new Exception(exceptionMessage));
listener.onFailure(new FlowFrameworkException(exceptionMessage, RestStatus.BAD_REQUEST));

Check warning on line 390 in src/main/java/org/opensearch/flowframework/indices/FlowFrameworkIndicesHandler.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/flowframework/indices/FlowFrameworkIndicesHandler.java#L390

Added line #L390 was not covered by tests
} else {
try (ThreadContext.StoredContext context = client.threadPool().getThreadContext().stashContext()) {
UpdateRequest updateRequest = new UpdateRequest(indexName, documentId);
Expand All @@ -376,8 +398,9 @@
// TODO: decide what condition can be considered as an update conflict and add retry strategy
client.update(updateRequest, ActionListener.runBefore(listener, () -> context.restore()));
} catch (Exception e) {
logger.error("Failed to update {} entry : {}. {}", indexName, documentId, e.getMessage());
listener.onFailure(e);
String errorMessage = "Failed to update " + indexName + " entry : " + documentId;
logger.error(errorMessage, e);
listener.onFailure(new FlowFrameworkException(errorMessage + " : " + e.getMessage(), ExceptionsHelper.status(e)));

Check warning on line 403 in src/main/java/org/opensearch/flowframework/indices/FlowFrameworkIndicesHandler.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/flowframework/indices/FlowFrameworkIndicesHandler.java#L401-L403

Added lines #L401 - L403 were not covered by tests
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,7 @@
}
}
if (name == null) {
throw new IOException("An template object requires a name.");
throw new IOException("A template object requires a name.");

Check warning on line 180 in src/main/java/org/opensearch/flowframework/model/Template.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/flowframework/model/Template.java#L180

Added line #L180 was not covered by tests
}

return new Template(name, description, useCase, templateVersion, compatibilityVersion, workflows, user);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,18 +9,26 @@
package org.opensearch.flowframework.rest;

import com.google.common.collect.ImmutableList;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.client.node.NodeClient;
import org.opensearch.core.action.ActionListener;
import org.opensearch.core.rest.RestStatus;
import org.opensearch.core.xcontent.ToXContent;
import org.opensearch.core.xcontent.XContentBuilder;
import org.opensearch.flowframework.exception.FlowFrameworkException;
import org.opensearch.flowframework.model.Template;
import org.opensearch.flowframework.transport.CreateWorkflowAction;
import org.opensearch.flowframework.transport.WorkflowRequest;
import org.opensearch.rest.BaseRestHandler;
import org.opensearch.rest.BytesRestResponse;
import org.opensearch.rest.RestRequest;
import org.opensearch.rest.action.RestToXContentListener;

import java.io.IOException;
import java.util.List;
import java.util.Locale;

import static org.opensearch.flowframework.common.CommonValue.DRY_RUN;
import static org.opensearch.flowframework.common.CommonValue.WORKFLOW_ID;
import static org.opensearch.flowframework.common.CommonValue.WORKFLOW_URI;

Expand All @@ -29,6 +37,7 @@
*/
public class RestCreateWorkflowAction extends BaseRestHandler {

private static final Logger logger = LogManager.getLogger(RestCreateWorkflowAction.class);
private static final String CREATE_WORKFLOW_ACTION = "create_workflow_action";

/**
Expand All @@ -53,11 +62,32 @@

@Override
protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient client) throws IOException {
try {

String workflowId = request.param(WORKFLOW_ID);
Template template = Template.parse(request.content().utf8ToString());
WorkflowRequest workflowRequest = new WorkflowRequest(workflowId, template);
return channel -> client.execute(CreateWorkflowAction.INSTANCE, workflowRequest, new RestToXContentListener<>(channel));
String workflowId = request.param(WORKFLOW_ID);
Template template = Template.parse(request.content().utf8ToString());
boolean dryRun = request.paramAsBoolean(DRY_RUN, false);

Check warning on line 69 in src/main/java/org/opensearch/flowframework/rest/RestCreateWorkflowAction.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/flowframework/rest/RestCreateWorkflowAction.java#L68-L69

Added lines #L68 - L69 were not covered by tests

WorkflowRequest workflowRequest = new WorkflowRequest(workflowId, template, dryRun);

Check warning on line 71 in src/main/java/org/opensearch/flowframework/rest/RestCreateWorkflowAction.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/flowframework/rest/RestCreateWorkflowAction.java#L71

Added line #L71 was not covered by tests

return channel -> client.execute(CreateWorkflowAction.INSTANCE, workflowRequest, ActionListener.wrap(response -> {
XContentBuilder builder = response.toXContent(channel.newBuilder(), ToXContent.EMPTY_PARAMS);
channel.sendResponse(new BytesRestResponse(RestStatus.CREATED, builder));
}, exception -> {

Check warning on line 76 in src/main/java/org/opensearch/flowframework/rest/RestCreateWorkflowAction.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/flowframework/rest/RestCreateWorkflowAction.java#L73-L76

Added lines #L73 - L76 were not covered by tests
try {
FlowFrameworkException ex = (FlowFrameworkException) exception;
XContentBuilder exceptionBuilder = ex.toXContent(channel.newErrorBuilder(), ToXContent.EMPTY_PARAMS);
channel.sendResponse(new BytesRestResponse(ex.getRestStatus(), exceptionBuilder));
} catch (IOException e) {
logger.error("Failed to send back create workflow exception", e);
}
}));

Check warning on line 84 in src/main/java/org/opensearch/flowframework/rest/RestCreateWorkflowAction.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/flowframework/rest/RestCreateWorkflowAction.java#L78-L84

Added lines #L78 - L84 were not covered by tests
} catch (Exception e) {
FlowFrameworkException ex = new FlowFrameworkException(e.getMessage(), RestStatus.BAD_REQUEST);
return channel -> channel.sendResponse(
new BytesRestResponse(ex.getRestStatus(), ex.toXContent(channel.newErrorBuilder(), ToXContent.EMPTY_PARAMS))
);
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,19 @@
package org.opensearch.flowframework.rest;

import com.google.common.collect.ImmutableList;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.client.node.NodeClient;
import org.opensearch.core.action.ActionListener;
import org.opensearch.core.rest.RestStatus;
import org.opensearch.core.xcontent.ToXContent;
import org.opensearch.core.xcontent.XContentBuilder;
import org.opensearch.flowframework.exception.FlowFrameworkException;
import org.opensearch.flowframework.transport.ProvisionWorkflowAction;
import org.opensearch.flowframework.transport.WorkflowRequest;
import org.opensearch.rest.BaseRestHandler;
import org.opensearch.rest.BytesRestResponse;
import org.opensearch.rest.RestRequest;
import org.opensearch.rest.action.RestToXContentListener;

import java.io.IOException;
import java.util.List;
Expand All @@ -30,6 +35,8 @@
*/
public class RestProvisionWorkflowAction extends BaseRestHandler {

private static final Logger logger = LogManager.getLogger(RestProvisionWorkflowAction.class);

private static final String PROVISION_WORKFLOW_ACTION = "provision_workflow_action";

/**
Expand All @@ -52,21 +59,35 @@

@Override
protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient client) throws IOException {

// Validate content
if (request.hasContent()) {
throw new FlowFrameworkException("Invalid request format", RestStatus.BAD_REQUEST);
}

// Validate params
String workflowId = request.param(WORKFLOW_ID);
if (workflowId == null) {
throw new FlowFrameworkException("workflow_id cannot be null", RestStatus.BAD_REQUEST);
try {
// Validate content
if (request.hasContent()) {
throw new FlowFrameworkException("Invalid request format", RestStatus.BAD_REQUEST);
}
// Validate params
if (workflowId == null) {
throw new FlowFrameworkException("workflow_id cannot be null", RestStatus.BAD_REQUEST);
}
// Create request and provision
WorkflowRequest workflowRequest = new WorkflowRequest(workflowId, null);
return channel -> client.execute(ProvisionWorkflowAction.INSTANCE, workflowRequest, ActionListener.wrap(response -> {
XContentBuilder builder = response.toXContent(channel.newBuilder(), ToXContent.EMPTY_PARAMS);
channel.sendResponse(new BytesRestResponse(RestStatus.OK, builder));
}, exception -> {

Check warning on line 77 in src/main/java/org/opensearch/flowframework/rest/RestProvisionWorkflowAction.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/flowframework/rest/RestProvisionWorkflowAction.java#L73-L77

Added lines #L73 - L77 were not covered by tests
try {
FlowFrameworkException ex = (FlowFrameworkException) exception;
XContentBuilder exceptionBuilder = ex.toXContent(channel.newErrorBuilder(), ToXContent.EMPTY_PARAMS);
channel.sendResponse(new BytesRestResponse(ex.getRestStatus(), exceptionBuilder));
} catch (IOException e) {
logger.error("Failed to send back provision workflow exception", e);
}
}));

Check warning on line 85 in src/main/java/org/opensearch/flowframework/rest/RestProvisionWorkflowAction.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/flowframework/rest/RestProvisionWorkflowAction.java#L79-L85

Added lines #L79 - L85 were not covered by tests
} catch (FlowFrameworkException ex) {
return channel -> channel.sendResponse(
new BytesRestResponse(ex.getRestStatus(), ex.toXContent(channel.newErrorBuilder(), ToXContent.EMPTY_PARAMS))
);
}

// Create request and provision
WorkflowRequest workflowRequest = new WorkflowRequest(workflowId, null);
return channel -> client.execute(ProvisionWorkflowAction.INSTANCE, workflowRequest, new RestToXContentListener<>(channel));
}

}
Loading
Loading